Skip to content

Commit 0cf51b6

Browse files
authored
Add tuple compression for inter-worker communication (#8707)
* init config * (de)serialization logic * deserialization logic refinement, unit tests * docs + storm perf example * apply new checkstyle rules * config param `topology.tuple.compression.max.decompressed.bytes` * improve deserialize zstd false positive collision logic * enter the decompress branch when the topology actually uses compression * minor changes * docs changes + additional test case * remove unnecessary arrays allocation in `KryoTupleSerializer` * add LOG.debug for deserialize collision * ad-hoc test cases for false positive isZstd deserializer * add bench in docs
1 parent 9d6b1bf commit 0cf51b6

10 files changed

Lines changed: 797 additions & 15 deletions

File tree

conf/defaults.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,12 @@ storm.nimbus.zookeeper.acls.fixup: true
5555
storm.auth.simple-white-list.users: [ ]
5656
storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory"
5757
storm.meta.serialization.delegate: "org.apache.storm.serialization.ZstdBridgeThriftSerializationDelegate"
58+
topology.tuple.compression.threshold: 1460
59+
topology.tuple.compression.enable: false
5860
storm.compression.zstd.level: 3
5961
storm.compression.zstd.max.decompressed.bytes: 104857600
6062
storm.compression.gzip.max.decompressed.bytes: 104857600
63+
topology.tuple.compression.max.decompressed.bytes: 10485760
6164
storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor"
6265
storm.workers.artifacts.dir: "workers-artifacts"
6366
storm.health.check.dir: "healthchecks"

docs/Serialization.md

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,89 @@ Beware that Java serialization is extremely expensive, both in terms of CPU cost
6161

6262
You can turn on/off the behavior to fall back on Java serialization by setting the `Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION` config to true/false. The default value is false for security reasons.
6363

64+
### Tuple compression
65+
66+
For inter-worker (remote) traffic, Storm can optionally compress serialized tuples with [Zstandard](https://facebook.github.io/zstd/) before they are sent over the network. This is intended for one specific scenario: components that emit **large** payloads to a remote worker, where the bytes saved on the wire outweigh the CPU cost of compression. A good example is a spout that emits entire lines of text to a downstream bolt running on a different worker.
67+
68+
Compression is **disabled by default** and follows the serialization lifecycle exactly:
69+
70+
- **Intra-worker (local) traffic** bypasses `KryoTupleSerializer` altogether, so it is never compressed regardless of configuration. You do not pay any CPU cost for tuples that stay inside a worker process.
71+
- **Inter-worker (remote) traffic** is compressed only when compression is enabled for the source component *and* the serialized tuple is larger than the configured threshold. Small tuples (single words, IDs, etc.) are left uncompressed, since the framing overhead of a compressed payload can exceed the original size.
72+
73+
#### Enabling compression per component
74+
75+
Compression is controlled by the component-specific configuration `topology.tuple.compression.enable`. Because Storm merges component-specific configuration over the topology configuration, you can enable it for just the components that emit large tuples, leaving the rest of the topology untouched:
76+
77+
```java
78+
TopologyBuilder builder = new TopologyBuilder();
79+
80+
builder.setSpout(SPOUT_ID, new FileReadSpout(inputFile), spoutNum)
81+
.addConfiguration(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE, true);
82+
83+
builder.setBolt(SPLIT_ID, new SplitSentenceBolt(), spBoltNum)
84+
.localOrShuffleGrouping(SPOUT_ID);
85+
builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum)
86+
.fieldsGrouping(SPLIT_ID, new Fields(SplitSentenceBolt.FIELDS));
87+
```
88+
89+
You can also enable it topology-wide (or cluster-wide via `storm.yaml`) by setting `topology.tuple.compression.enable: true`, but enabling it only where large tuples are actually emitted is recommended.
90+
91+
#### Flux
92+
93+
> **Note:** With [Flux](flux.html), only **topology-wide** enablement is currently possible. Flux has no per-component configuration mechanism — `FluxBuilder` applies only parallelism, number of tasks, memory/CPU load, and groupings to the underlying declarers, and the `config:` block is topology-scoped. There is no Flux equivalent of `declarer.addConfiguration(...)`, so the per-component approach recommended above cannot be expressed in a Flux YAML definition.
94+
95+
To enable compression for a Flux topology, set it in the topology-level `config:` block:
96+
97+
```yaml
98+
config:
99+
topology.tuple.compression.enable: true
100+
topology.tuple.compression.threshold: 1460
101+
```
102+
103+
Be aware that this enables compression for *every* remote-bound tuple in the topology that exceeds the threshold.
104+
105+
#### Configuration reference
106+
107+
| Config | Default | Description |
108+
| --- | --- | --- |
109+
| `topology.tuple.compression.enable` | `false` | Enables Zstd compression of serialized tuples before remote transfer. Best set per component via `addConfiguration`. |
110+
| `topology.tuple.compression.threshold` | `1460` | Minimum serialized tuple size, in bytes, before compression is attempted. Tuples at or below this size are sent uncompressed. The default matches the typical Ethernet TCP MSS, so payloads that already fit in a single network frame are never compressed. |
111+
| `storm.compression.zstd.level` | `3` | Zstd compression level. Supported range is 1–19; levels 20–22 (ultra mode) are prohibited because of their memory requirements. |
112+
| `topology.tuple.compression.max.decompressed.bytes` | `10485760` (10 MB) | Upper bound on the decompressed size of a single tuple. Decompression that would exceed this limit fails, guarding against malicious or corrupt payloads. |
113+
114+
#### How decompression works
115+
116+
Compression is self-describing on the wire, so **no extra configuration is required on the receiving side**. The deserializer inspects the leading bytes of each incoming payload: if they match the Zstd magic header it decompresses the payload (bounded by `topology.tuple.compression.max.decompressed.bytes`) before deserializing, otherwise it deserializes the bytes directly. A single deserializer therefore transparently handles a mix of compressed and uncompressed tuples.
117+
118+
As an optimization, the deserializer determines once — when the worker starts — whether *any* component in the topology enables compression (by scanning the merged per-component configurations). If none does, the magic-header check is skipped entirely and the Zstd code path is never touched, so topologies that do not use the feature pay no per-tuple cost. The corollary is that compression must be enabled somewhere in the topology config for compressed tuples to be decompressed on receipt; since the setting is part of the topology configuration shared by all of its workers, this is always the case for tuples produced within the same topology.
119+
120+
#### Indicative benchmark
121+
122+
> **Disclaimer:** These numbers were gathered in a limited capacity while developing this feature and should be treated as a rough guide only, not as a performance guarantee. They were produced on a specific, deliberately favourable setup and your results will vary with topology shape, tuple size, network characteristics, and hardware.
123+
124+
The benchmark ran two equivalent word-count topologies defined in `storm-perf` — one with tuple compression enabled in Spout component (`FileReadWordCountSpoutCompressionTopo`) and one without (`FileReadWordCountTopo`) — across workers connected by a simulated network with **10 ms latency** and **0.5 ms jitter**. This does not represent a typical intra-datacenter network; it deliberately emphasizes the maximum advantage the feature can offer when configured well. The tuple size used is the smallest that still yields a real benefit from compression (~1.5 KB).
125+
126+
Sample round-trip ping between two supervisors on the Docker network:
127+
```
128+
--- cluster-supervisor2-1 ping statistics ---
129+
5 packets transmitted, 5 received, 0% packet loss, time 4004ms
130+
rtt min/avg/max/mdev = 18.767/24.353/42.486/9.083 ms
131+
```
132+
133+
Results (compression vs. no compression):
134+
135+
| Metric | Compression | No compression | Difference | Better |
136+
| --- | --- | --- | --- | --- |
137+
| Avg transfer rate (msg/s) | 776,389 | 744,544 | +31,845 (+4.3%) | Compression |
138+
| Peak transfer rate (msg/s) | 805,700 | 790,300 | +15,400 | Compression |
139+
| Avg spout throughput (acks/s) | 98,167 | 92,844 | +5,323 (+5.8%) | Compression |
140+
| Peak spout throughput (acks/s) | 100,300 | 98,666 | +1,634 | Compression |
141+
| Avg complete latency (ms) | 362.48 | 376.73 | -14.25 (-3.8%) | Compression |
142+
| Max complete latency (ms) | 366.44 | 385.72 | -19.28 | Compression |
143+
| Runtime stability | More consistent | More fluctuation | — | Compression |
144+
145+
In this configuration, compression improved transfer rate and spout throughput by roughly 4–6% and reduced complete latency by a few percent, while also producing more consistent per-task behaviour (less jitter across tasks). The takeaway is qualitative: when large tuples cross a high-latency link, trading CPU for fewer bytes on the wire can pay off — but you should measure with your own workload before enabling it broadly.
146+
64147
### Component-specific serialization registrations
65148

66149
Storm 0.7.0 lets you set component-specific configurations (read more about this at [Configuration](Configuration.html)). Of course, if one component defines a serialization that serialization will need to be available to other bolts -- otherwise they won't be able to receive messages from that component!
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License
17+
*/
18+
19+
package org.apache.storm.perf;
20+
21+
import java.util.Map;
22+
import org.apache.storm.Config;
23+
import org.apache.storm.generated.StormTopology;
24+
import org.apache.storm.perf.bolt.CountBolt;
25+
import org.apache.storm.perf.bolt.SplitSentenceBolt;
26+
import org.apache.storm.perf.spout.FileReadSpout;
27+
import org.apache.storm.perf.utils.Helper;
28+
import org.apache.storm.topology.TopologyBuilder;
29+
import org.apache.storm.tuple.Fields;
30+
import org.apache.storm.utils.Utils;
31+
32+
/**
33+
* This topo helps measure speed of word count.
34+
*
35+
* <p>Spout loads a file into memory on initialization, then emits the lines in an endless loop.
36+
*/
37+
public class FileReadWordCountSpoutCompressionTopo {
38+
public static final String SPOUT_ID = "spout";
39+
public static final String COUNT_ID = "counter";
40+
public static final String SPLIT_ID = "splitter";
41+
public static final String TOPOLOGY_NAME = "FileReadWordCountSpoutCompressionTopo";
42+
43+
// Config settings
44+
public static final String SPOUT_NUM = "spout.count";
45+
public static final String SPLIT_NUM = "splitter.count";
46+
public static final String COUNT_NUM = "counter.count";
47+
public static final String INPUT_FILE = "input.file";
48+
49+
public static final int DEFAULT_SPOUT_NUM = 1;
50+
public static final int DEFAULT_SPLIT_BOLT_NUM = 2;
51+
public static final int DEFAULT_COUNT_BOLT_NUM = 2;
52+
53+
54+
static StormTopology getTopology(Map<String, Object> config) {
55+
56+
final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
57+
final int spBoltNum = Helper.getInt(config, SPLIT_NUM, DEFAULT_SPLIT_BOLT_NUM);
58+
final int cntBoltNum = Helper.getInt(config, COUNT_NUM, DEFAULT_COUNT_BOLT_NUM);
59+
final String inputFile = Helper.getStr(config, INPUT_FILE);
60+
61+
TopologyBuilder builder = new TopologyBuilder();
62+
// sampledata/longrandomwords.txt contains sentences with at least 1500 bytes
63+
builder.setSpout(SPOUT_ID, new FileReadSpout(inputFile), spoutNum)
64+
.addConfiguration(Config.TOPOLOGY_TUPLE_COMPRESSION_ENABLE, true);
65+
builder.setBolt(SPLIT_ID, new SplitSentenceBolt(), spBoltNum).localOrShuffleGrouping(SPOUT_ID);
66+
builder.setBolt(COUNT_ID, new CountBolt(), cntBoltNum).fieldsGrouping(SPLIT_ID, new Fields(SplitSentenceBolt.FIELDS));
67+
68+
return builder.createTopology();
69+
}
70+
71+
public static void main(String[] args) throws Exception {
72+
int runTime = -1;
73+
Config topoConf = new Config();
74+
if (args.length > 0) {
75+
runTime = Integer.parseInt(args[0]);
76+
}
77+
if (args.length > 1) {
78+
topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
79+
}
80+
topoConf.put(Config.TOPOLOGY_PRODUCER_BATCH_SIZE, 1000);
81+
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_STRATEGY, "org.apache.storm.policy.WaitStrategyPark");
82+
topoConf.put(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC, 0);
83+
topoConf.put(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING, true);
84+
topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.0005);
85+
86+
topoConf.putAll(Utils.readCommandLineOpts());
87+
if (args.length > 2) {
88+
System.err.println("args: [runDurationSec] [optionalConfFile]");
89+
return;
90+
}
91+
// Submit topology to storm cluster
92+
Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
93+
}
94+
}

0 commit comments

Comments
 (0)