Skip to content

Commit 85a5cbc

Browse files
authored
Merge pull request #8783 from GGraziadei/8710-flux-component-conf
Allow per-component configuration in Flux topology
2 parents 6697735 + 0e49fd6 commit 85a5cbc

11 files changed

Lines changed: 452 additions & 48 deletions

File tree

docs/Serialization.md

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,32 @@ You can also enable it topology-wide (or cluster-wide via `storm.yaml`) by setti
9090

9191
#### Flux
9292

93-
> **Note:** With [Flux](flux.html), only **topology-wide** enablement is currently possible. Flux has no per-component configuration mechanism — `FluxBuilder` applies only parallelism, number of tasks, memory/CPU load, and groupings to the underlying declarers, and the `config:` block is topology-scoped. There is no Flux equivalent of `declarer.addConfiguration(...)`, so the per-component approach recommended above cannot be expressed in a Flux YAML definition.
93+
[Flux](flux.html) supports per-component configuration. In addition to parallelism, number of tasks, memory/CPU load, and groupings, each spout and bolt definition accepts a `config:` block that `FluxBuilder` applies to the underlying declarer via `addConfigurations(...)`. This is the Flux equivalent of `declarer.addConfiguration(...)`, so you can enable compression for just the components that emit large tuples:
9494

95-
To enable compression for a Flux topology, set it in the topology-level `config:` block:
95+
```yaml
96+
spouts:
97+
- id: "file-read-spout"
98+
className: "org.apache.storm.perf.spout.FileReadSpout"
99+
parallelism: 1
100+
# enable compression for this spout only
101+
config:
102+
topology.tuple.compression.enable: true
103+
104+
bolts:
105+
- id: "split"
106+
className: "org.apache.storm.perf.bolt.SplitSentenceBolt"
107+
parallelism: 1
108+
```
109+
110+
You can also enable it topology-wide by setting it in the topology-level `config:`
96111

97112
```yaml
98113
config:
99114
topology.tuple.compression.enable: true
100115
topology.tuple.compression.threshold: 1460
101116
```
102117

103-
Be aware that this enables compression for *every* remote-bound tuple in the topology that exceeds the threshold.
118+
Be aware that the topology-wide form enables compression for *every* remote-bound tuple in the topology that exceeds the threshold.
104119

105120
#### Configuration reference
106121

flux/README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,17 @@ component when the topology is deployed.
574574
Because spout and bolt definitions extend `component` they support constructor arguments, references, and properties as
575575
well.
576576

577+
In addition to `parallelism`, spout and bolt definitions support the following optional parameters that map directly to
578+
the underlying Storm `BoltDeclarer`/`SpoutDeclarer`:
579+
580+
| Parameter | Description |
581+
|---------------------|------------------------------------------------------------------------------------------|
582+
| `numTasks` | The number of tasks for the component (`setNumTasks`). |
583+
| `onHeapMemoryLoad` | The on-heap memory load, in MB, for resource-aware scheduling (`setMemoryLoad`). |
584+
| `offHeapMemoryLoad` | The off-heap memory load, in MB, for resource-aware scheduling (`setMemoryLoad`). |
585+
| `cpuLoad` | The CPU load for resource-aware scheduling (`setCPULoad`). |
586+
| `config` | A map of configuration parameters applied only to this component (`addConfigurations`). |
587+
577588
Shell spout example:
578589

579590
```yaml
@@ -656,6 +667,35 @@ bolts:
656667
parallelism: 1
657668
# ...
658669
```
670+
671+
### Per-Component Configuration
672+
In addition to the topology-wide [Topology Config](#topology-config), each spout and bolt can declare its own `config`
673+
map. These configurations are applied to that component only, via the declarer's `addConfigurations(...)` method,
674+
following Storm's native support for component-level configuration. This avoids enabling a configuration topology-wide
675+
when only a single component requires it.
676+
677+
```yaml
678+
spouts:
679+
- id: "sentence-spout"
680+
className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
681+
parallelism: 1
682+
# configuration applied to this spout only
683+
config:
684+
topology.max.spout.pending: 1000
685+
686+
bolts:
687+
- id: "log"
688+
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
689+
parallelism: 1
690+
# configuration applied to this bolt only
691+
config:
692+
topology.tuple.compression.enable: true
693+
```
694+
695+
Known Storm configuration keys are validated when the topology is built, so an invalid value (for example, a
696+
non-boolean value for `topology.tuple.compression.enable`) fails fast rather than at submission time. Unknown/custom
697+
keys are not validated and are passed through verbatim; validation runs client-side at build time.
698+
659699
## Streams and Stream Groupings
660700
Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in
661701
a topology, with an associated Grouping definition.

flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.storm.grouping.CustomStreamGrouping;
4949
import org.apache.storm.hooks.IWorkerHook;
5050
import org.apache.storm.topology.BoltDeclarer;
51+
import org.apache.storm.topology.ComponentConfigurationDeclarer;
5152
import org.apache.storm.topology.IBasicBolt;
5253
import org.apache.storm.topology.IRichBolt;
5354
import org.apache.storm.topology.IRichSpout;
@@ -57,11 +58,12 @@
5758
import org.apache.storm.topology.TopologyBuilder;
5859
import org.apache.storm.tuple.Fields;
5960
import org.apache.storm.utils.Utils;
61+
import org.apache.storm.validation.ConfigValidation;
6062
import org.slf4j.Logger;
6163
import org.slf4j.LoggerFactory;
6264

6365
public class FluxBuilder {
64-
private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
66+
private static final Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
6567

6668

6769
/**
@@ -72,7 +74,10 @@ public class FluxBuilder {
7274
public static Config buildConfig(TopologyDef topologyDef) {
7375
// merge contents of `config` into topology config
7476
Config conf = new Config();
75-
conf.putAll(topologyDef.getConfig());
77+
Map<String, Object> topologyConfig = topologyDef.getConfig();
78+
// validate the topology-wide config so invalid values fail fast
79+
ConfigValidation.validateFields(topologyConfig);
80+
conf.putAll(topologyConfig);
7681
return conf;
7782
}
7883

@@ -185,55 +190,40 @@ private static void buildStreamDefinitions(ExecutionContext context, TopologyBui
185190
for (StreamDef stream : topologyDef.getStreams()) {
186191
Object boltObj = context.getBolt(stream.getTo());
187192
BoltDeclarer declarer = declarers.get(stream.getTo());
188-
if (boltObj instanceof IRichBolt) {
189-
if (declarer == null) {
190-
declarer = builder.setBolt(stream.getTo(),
191-
(IRichBolt) boltObj,
193+
boolean newDeclarer = declarer == null;
194+
if (newDeclarer) {
195+
declarer = switch (boltObj) {
196+
case IRichBolt b -> builder.setBolt(stream.getTo(), b,
192197
topologyDef.parallelismForBolt(stream.getTo()));
193-
declarers.put(stream.getTo(), declarer);
194-
}
195-
} else if (boltObj instanceof IBasicBolt) {
196-
if (declarer == null) {
197-
declarer = builder.setBolt(
198-
stream.getTo(),
199-
(IBasicBolt) boltObj,
198+
case IBasicBolt b -> builder.setBolt(stream.getTo(), b,
200199
topologyDef.parallelismForBolt(stream.getTo()));
201-
declarers.put(stream.getTo(), declarer);
202-
}
203-
} else if (boltObj instanceof IWindowedBolt) {
204-
if (declarer == null) {
205-
declarer = builder.setBolt(
206-
stream.getTo(),
207-
(IWindowedBolt) boltObj,
200+
case IWindowedBolt b -> builder.setBolt(stream.getTo(), b,
208201
topologyDef.parallelismForBolt(stream.getTo()));
209-
declarers.put(stream.getTo(), declarer);
210-
}
211-
} else if (boltObj instanceof IStatefulBolt) {
212-
if (declarer == null) {
213-
declarer = builder.setBolt(
214-
stream.getTo(),
215-
(IStatefulBolt) boltObj,
202+
case IStatefulBolt b -> builder.setBolt(stream.getTo(), b,
216203
topologyDef.parallelismForBolt(stream.getTo()));
217-
declarers.put(stream.getTo(), declarer);
204+
default -> throw new IllegalArgumentException("Class does not appear to be a bolt: "
205+
+ boltObj.getClass().getName());
206+
};
207+
// resource and config declarations apply to the bolt as a whole, so only apply them once
208+
// when the declarer is first created rather than on every incoming stream
209+
BoltDef boltDef = topologyDef.getBoltDef(stream.getTo());
210+
if (boltDef.getOnHeapMemoryLoad() > -1) {
211+
if (boltDef.getOffHeapMemoryLoad() > -1) {
212+
declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad());
213+
} else {
214+
declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad());
215+
}
218216
}
219-
} else {
220-
throw new IllegalArgumentException("Class does not appear to be a bolt: "
221-
+ boltObj.getClass().getName());
222-
}
223-
224-
BoltDef boltDef = topologyDef.getBoltDef(stream.getTo());
225-
if (boltDef.getOnHeapMemoryLoad() > -1) {
226-
if (boltDef.getOffHeapMemoryLoad() > -1) {
227-
declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad());
228-
} else {
229-
declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad());
217+
if (boltDef.getCpuLoad() > -1) {
218+
declarer.setCPULoad(boltDef.getCpuLoad());
230219
}
231-
}
232-
if (boltDef.getCpuLoad() > -1) {
233-
declarer.setCPULoad(boltDef.getCpuLoad());
234-
}
235-
if (boltDef.getNumTasks() > -1) {
236-
declarer.setNumTasks(boltDef.getNumTasks());
220+
if (boltDef.getNumTasks() > -1) {
221+
declarer.setNumTasks(boltDef.getNumTasks());
222+
}
223+
applyComponentConfig(boltDef.getConfig(), declarer);
224+
225+
// persist in declares cache
226+
declarers.put(stream.getTo(), declarer);
237227
}
238228

239229
GroupingDef grouping = stream.getGrouping();
@@ -456,6 +446,7 @@ private static void buildSpouts(ExecutionContext context, TopologyBuilder builde
456446
if (sd.getNumTasks() > -1) {
457447
declarer.setNumTasks(sd.getNumTasks());
458448
}
449+
applyComponentConfig(sd.getConfig(), declarer);
459450

460451
context.addSpout(sd.getId(), spout);
461452
}
@@ -470,6 +461,14 @@ private static IRichSpout buildSpout(SpoutDef def, ExecutionContext context) thr
470461
return (IRichSpout) buildObject(def, context);
471462
}
472463

464+
private static void applyComponentConfig(Map<String, Object> config, ComponentConfigurationDeclarer declarer) {
465+
if (config == null || config.isEmpty()) {
466+
return;
467+
}
468+
ConfigValidation.validateFields(config);
469+
declarer.addConfigurations(config);
470+
}
471+
473472
/**
474473
* Given a list of bolt definitions, build a map of Storm bolts with the bolt definition id as the key.
475474
* Attempt to coerce the given constructor arguments to a matching bolt constructor as much as possible.

flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package org.apache.storm.flux.model;
2020

21+
import java.util.HashMap;
22+
import java.util.Map;
23+
2124
/**
2225
* Abstract parent class of component definitions.
2326
* (spouts/bolts)
@@ -30,6 +33,8 @@ public abstract class VertexDef extends BeanDef {
3033
private int onHeapMemoryLoad = -1;
3134
private int offHeapMemoryLoad = -1;
3235
private int cpuLoad = -1;
36+
// per-component configuration
37+
private Map<String, Object> config = new HashMap<>();
3338

3439
public int getParallelism() {
3540
return parallelism;
@@ -70,4 +75,12 @@ public int getCpuLoad() {
7075
public void setCpuLoad(int cpuLoad) {
7176
this.cpuLoad = cpuLoad;
7277
}
78+
79+
public Map<String, Object> getConfig() {
80+
return config;
81+
}
82+
83+
public void setConfig(Map<String, Object> config) {
84+
this.config = config;
85+
}
7386
}

flux/flux-core/src/test/java/org/apache/storm/flux/FluxBuilderTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,15 @@
1717
*/
1818
package org.apache.storm.flux;
1919

20+
import java.util.HashMap;
21+
import java.util.Map;
22+
23+
import org.apache.storm.Config;
24+
import org.apache.storm.flux.model.TopologyDef;
2025
import org.junit.jupiter.api.Test;
26+
import static org.junit.jupiter.api.Assertions.assertEquals;
2127
import static org.junit.jupiter.api.Assertions.assertFalse;
28+
import static org.junit.jupiter.api.Assertions.assertThrows;
2229
import static org.junit.jupiter.api.Assertions.assertTrue;
2330

2431
public class FluxBuilderTest {
@@ -29,4 +36,26 @@ public void testIsPrimitiveNumber() {
2936
assertFalse(FluxBuilder.isPrimitiveNumber(boolean.class));
3037
assertFalse(FluxBuilder.isPrimitiveNumber(String.class));
3138
}
39+
40+
@Test
41+
public void testBuildConfigAcceptsValidTopologyConfig() {
42+
Map<String, Object> config = new HashMap<>();
43+
config.put(Config.TOPOLOGY_WORKERS, 4);
44+
TopologyDef topologyDef = new TopologyDef();
45+
topologyDef.setConfig(config);
46+
47+
Config result = FluxBuilder.buildConfig(topologyDef);
48+
assertEquals(4, result.get(Config.TOPOLOGY_WORKERS));
49+
}
50+
51+
@Test
52+
public void testBuildConfigRejectsInvalidTopologyConfig() {
53+
// topology.workers must be a positive integer; a String value is invalid
54+
Map<String, Object> config = new HashMap<>();
55+
config.put(Config.TOPOLOGY_WORKERS, "not-a-number");
56+
TopologyDef topologyDef = new TopologyDef();
57+
topologyDef.setConfig(config);
58+
59+
assertThrows(IllegalArgumentException.class, () -> FluxBuilder.buildConfig(topologyDef));
60+
}
3261
}

0 commit comments

Comments
 (0)