|
| 1 | +# Local Storm cluster (cluster mode) with Docker Compose |
| 2 | + |
| 3 | +Brings up a real, distributed Storm cluster on your machine — **dev ZooKeeper + |
| 4 | +Nimbus + two Supervisors + UI** plus an observability stack (**Pushgateway + |
| 5 | +Prometheus + Grafana**). |
| 6 | + |
| 7 | +Two supervisors with **2 worker slots each (4 slots total)** are intentional: a |
| 8 | +topology submitted with `topology.workers >= 2` lands one worker per supervisor |
| 9 | +container, so its inter-worker tuple traffic actually crosses the network — the |
| 10 | +only path where tuple serialization happens. The |
| 11 | +slot count lives in `storm.yaml` (`supervisor.slots.ports`); raise it there if |
| 12 | +you want more workers. |
| 13 | + |
| 14 | +## Architecture |
| 15 | + |
| 16 | +All containers share one Docker bridge network (`storm`) and resolve each other |
| 17 | +by service name. Host-published ports are shown in `()`. The metrics plane is |
| 18 | +detailed under [Metrics & reports](#metrics--reports-prometheus--grafana). |
| 19 | + |
| 20 | +```text |
| 21 | + host: docker compose exec nimbus storm jar ... |
| 22 | + | submit topology |
| 23 | + ========================|=========== docker network: storm ============== |
| 24 | + v |
| 25 | + +-----------+ +---------------+ +-----------+ |
| 26 | + | ZooKeeper |<---->| Nimbus :6627 |<---->| UI :8080 | |
| 27 | + +-----------+ +-------+-------+ +-----------+ |
| 28 | + | assign workers |
| 29 | + +---------------+----------------+ |
| 30 | + v v |
| 31 | + +--------------------+ tuples +--------------------+ |
| 32 | + | supervisor1 |<============>| supervisor2 | |
| 33 | + | worker :6700 | (network hop)| worker :6700 | |
| 34 | + +--------------------+ +--------------------+ |
| 35 | +
|
| 36 | + metrics: Nimbus --> Pushgateway and workers --> graphite-exporter, |
| 37 | + both scraped by Prometheus :9090 --> Grafana :3000 |
| 38 | +``` |
| 39 | + |
| 40 | +## Layout |
| 41 | + |
| 42 | +| File | Purpose | |
| 43 | +|------|---------| |
| 44 | +| `Dockerfile` | Runtime image `FROM eclipse-temurin:21-jre`, unpacks the built dist into `/opt/storm`. | |
| 45 | +| `Dockerfile.dockerignore` | Keeps the build context to just the dist tarball. | |
| 46 | +| `storm.yaml` | Cluster config (ZK + Nimbus seeds + slots), bind-mounted into every daemon. | |
| 47 | +| `docker-compose.yml` | dev ZooKeeper, Nimbus, supervisor1, supervisor2, UI, Pushgateway, graphite-exporter, Prometheus, Grafana. | |
| 48 | +| `FileReadWordCountTopo-cluster.yaml` | Sample topology config for the smoke test below. | |
| 49 | +| `storm-client.yaml` | Client config to submit topologies from the host (e.g. from IntelliJ). | |
| 50 | +| `build-image.sh` | One command: rebuild the dist from current source (lib **and** lib-worker), the Docker image, and (unless `--no-extlib`) the `extlib-daemon/` jars. | |
| 51 | +| `prepare-extlib.sh` | Builds the Prometheus reporter + deps into `extlib-daemon/` (mounted on Nimbus); run by `build-image.sh`, or standalone. | |
| 52 | +| `netsim.sh` | Inject network delay/jitter/loss between worker hosts (tc/netem) to test the network path. | |
| 53 | +| `prometheus/prometheus.yml` | Prometheus scrape config (Pushgateway + graphite-exporter). | |
| 54 | +| `graphite/graphite-mapping.yml` | Maps Storm metrics-v2 Graphite names into labelled Prometheus series. | |
| 55 | +| `grafana/` | Provisioned datasource + the **Storm Cluster** and **Storm Metrics v2** dashboards. | |
| 56 | + |
| 57 | +## Prerequisites |
| 58 | + |
| 59 | +> **Platform:** Linux or macOS (or Windows via WSL2). The helper scripts are |
| 60 | +> bash and call `mvn` (not `mvn.cmd`), and `netsim.sh` relies on Linux |
| 61 | +> `tc`/`netem`. Native Windows is not supported yet. |
| 62 | +
|
| 63 | +Build the distribution, the Docker image, **and** stage the Prometheus reporter |
| 64 | +onto Nimbus's classpath — one command: |
| 65 | + |
| 66 | +```bash |
| 67 | +dev-tools/cluster/build-image.sh |
| 68 | +``` |
| 69 | + |
| 70 | +It rebuilds `storm-client-bin` + `final-package` (so both the daemon `lib` and |
| 71 | +the worker `lib-worker` classpaths reflect your code), then builds the |
| 72 | +`storm-local` image. Building only `final-package -am` is **not** enough: it |
| 73 | +leaves `lib-worker` (the worker classpath) stale, so workers run old code. |
| 74 | + |
| 75 | +As a final step it runs `prepare-extlib.sh`, which fills `extlib-daemon/` |
| 76 | +(git-ignored build artifacts) with the Prometheus reporter jar + runtime deps |
| 77 | +that docker-compose mounts onto Nimbus. Pass `--no-extlib` (or set |
| 78 | +`PREPARE_EXTLIB=0`) to skip it, or run it standalone after changing only that |
| 79 | +module: |
| 80 | + |
| 81 | +```bash |
| 82 | +cd dev-tools/cluster |
| 83 | +./prepare-extlib.sh |
| 84 | +``` |
| 85 | + |
| 86 | +The Storm version is taken from the repo root `pom.xml` (`project.version`). |
| 87 | +`build-image.sh` reads it and writes `dev-tools/cluster/.env`; the compose file |
| 88 | +references it as `${STORM_VERSION}` (image tag, build arg, and the storm-perf jar |
| 89 | +path), so everything tracks the pom automatically. To pin a different version, |
| 90 | +run with `STORM_VERSION=x.y.z` or edit `.env`. |
| 91 | + |
| 92 | +## Run |
| 93 | + |
| 94 | +```bash |
| 95 | +cd dev-tools/cluster |
| 96 | +docker compose up --build -d # build the image and start everything |
| 97 | +docker compose ps # all services Up, zookeeper healthy |
| 98 | +docker compose logs -f nimbus # follow a daemon |
| 99 | +``` |
| 100 | + |
| 101 | +| Service | URL | Notes | |
| 102 | +|---------|-----|-------| |
| 103 | +| Storm UI | http://localhost:8080 | topologies, workers, capacity | |
| 104 | +| Grafana | http://localhost:3000 | login `admin` / `admin`; **Storm Cluster** + **Storm Metrics v2** dashboards | |
| 105 | +| Prometheus | http://localhost:9090 | raw queries / targets | |
| 106 | +| Nimbus Thrift | localhost:6627 | submit topologies from the host | |
| 107 | + |
| 108 | +Tear down — **use `-v`** so the metrics are deleted too: |
| 109 | + |
| 110 | +```bash |
| 111 | +docker compose down -v |
| 112 | +``` |
| 113 | + |
| 114 | +Prometheus and Grafana store their data on disk in the named volumes |
| 115 | +`prometheus-data` / `grafana-data` (Prometheus retention is capped at |
| 116 | +`--storage.tsdb.retention.time=2h` to keep them small). A plain `docker compose |
| 117 | +down` keeps the containers' networks gone but **leaves those volumes on disk**; |
| 118 | +`down -v` is what deletes them. The datasource and dashboards are re-provisioned |
| 119 | +from files on the next `up`, so wiping the volumes loses only metrics history |
| 120 | +and ad-hoc Grafana UI edits. |
| 121 | + |
| 122 | +## Smoke test: submit a topology |
| 123 | + |
| 124 | +The Nimbus container has the `storm-perf` jar, a sample input file and the |
| 125 | +config mounted under `/topology`. Submit the word-count topology (runs ~120s): |
| 126 | + |
| 127 | +```bash |
| 128 | +docker compose exec -d nimbus \ |
| 129 | + storm jar /topology/storm-perf.jar \ |
| 130 | + org.apache.storm.perf.FileReadWordCountTopo 120 /topology/topo.yaml |
| 131 | +``` |
| 132 | + |
| 133 | +Watch it in the UI, or via REST: |
| 134 | + |
| 135 | +```bash |
| 136 | +curl -s http://localhost:8080/api/v1/topology/summary | python3 -m json.tool |
| 137 | +``` |
| 138 | + |
| 139 | +`FileReadWordCountTopo-cluster.yaml` sets `topology.workers: 2`, so the two |
| 140 | +workers land on `supervisor1` and `supervisor2` — verify with the topology page |
| 141 | +(Worker Resources) that the two workers sit on different hosts. |
| 142 | + |
| 143 | +It is a 3-stage pipeline; spreading it across two workers makes at least one |
| 144 | +edge a network hop (where tuple serialization happens): |
| 145 | + |
| 146 | +```text |
| 147 | + FileReadSpout --shuffle (network hop)--> SplitSentenceBolt --fieldsGrouping--> CountBolt |
| 148 | + (emits text lines) (emits words) (counts) |
| 149 | +``` |
| 150 | + |
| 151 | +## Simulating network latency and jitter |
| 152 | + |
| 153 | +Inter-worker traffic between containers is near-instant (~0.05 ms), which hides |
| 154 | +the network cost. `netsim.sh` adds realistic |
| 155 | +latency/jitter/loss to the worker hosts with Linux `tc`/`netem`. The Storm image |
| 156 | +has no `tc`, so the script injects the qdisc from a throwaway helper container |
| 157 | +sharing each supervisor's network namespace — no image rebuild needed. |
| 158 | + |
| 159 | +```bash |
| 160 | +./netsim.sh add 50 10 0 # 50 ms delay, 10 ms jitter, 0% loss on each supervisor |
| 161 | +./netsim.sh ping # verify: worker<->worker RTT jumps to ~100 ms (2x egress) |
| 162 | +./netsim.sh show # inspect the active qdisc |
| 163 | +./netsim.sh clear # remove shaping |
| 164 | +``` |
| 165 | + |
| 166 | +netem shapes **all** egress from each supervisor (inter-worker tuples *and* |
| 167 | +heartbeats to Nimbus/ZK), so keep the delay moderate (≤ ~150 ms) or heartbeats |
| 168 | +may time out. With both supervisors delayed by `D`, worker round-trip latency is |
| 169 | +~`2*D`. |
| 170 | + |
| 171 | +> **Why the script sets a huge queue `limit`.** netem's default queue is only |
| 172 | +> 1000 packets. Under a high-throughput perf topology that buffer overflows at |
| 173 | +> the added delay and drops tuples even with `loss 0%`, which collapses TCP and |
| 174 | +> back-pressures the spout to **zero throughput** (you'll see `transferred 0`). |
| 175 | +> `netsim.sh` therefore sets `limit 1000000` (override as the 4th arg) so the |
| 176 | +> queue can hold `rate * delay` without dropping. If you ever apply `tc netem` |
| 177 | +> by hand, remember to add a large `limit`. |
| 178 | +
|
| 179 | +## Metrics & reports (Prometheus + Grafana) |
| 180 | + |
| 181 | +Two metric paths feed Prometheus, both push-based (so ephemeral workers need no |
| 182 | +scrape targets), and Grafana auto-loads a dashboard for each: |
| 183 | + |
| 184 | +```text |
| 185 | + Nimbus --push--> Pushgateway:9091 -----------------scrape-------------+ |
| 186 | + v |
| 187 | + supervisor1 worker --+ Prometheus:9090 --> Grafana:3000 |
| 188 | + +-- graphite:9109 --> graphite-exporter:9108 --scrape--+ | |
| 189 | + supervisor2 worker --+ +--> "Storm Cluster" |
| 190 | + +--> "Storm Metrics v2" |
| 191 | +``` |
| 192 | + |
| 193 | + |
| 194 | +1. **Cluster summary** — *Storm Cluster* dashboard |
| 195 | + `Nimbus → Pushgateway → Prometheus`. Nimbus runs Storm's |
| 196 | + `PrometheusPreparableReporter` (enabled via `-c` overrides in |
| 197 | + `docker-compose.yml`, jars from `extlib-daemon/`) and pushes cluster-summary |
| 198 | + metrics every 10s. Prometheus scrapes the Pushgateway (`honor_labels` keeps |
| 199 | + `job="nimbus"`). |
| 200 | +2. **Metrics v2 (per-worker/topology)** — *Storm Metrics v2* dashboard |
| 201 | + `workers → graphite-exporter → Prometheus`. Every worker runs the |
| 202 | + `GraphiteStormReporter` (configured in `storm.yaml` under |
| 203 | + `topology.metrics.reporters`) and emits its full Dropwizard metric set in |
| 204 | + Graphite plaintext to the graphite-exporter, which `graphite-mapping.yml` |
| 205 | + turns into labelled `storm_worker{...}` / `storm_topology{...}` series. |
| 206 | + |
| 207 | +The pushed series are cluster-level (not per-topology): `summary_cluster_num_supervisors`, |
| 208 | +`summary_cluster_num_topologies`, `summary_cluster_num_total_workers`, |
| 209 | +`summary_cluster_num_total_used_workers`, `nimbus_total_cpu`, |
| 210 | +`nimbus_available_cpu_non_negative`, `nimbus_total_memory`, and the |
| 211 | +`summary_topologies_assigned_*` histograms. Quick check: |
| 212 | + |
| 213 | +```bash |
| 214 | +curl -s 'http://localhost:9090/api/v1/query?query=summary_cluster_num_total_workers' |
| 215 | +``` |
| 216 | + |
| 217 | +### Storm Metrics v2 dashboard |
| 218 | + |
| 219 | +Metrics v2 are emitted **per task** (`org.apache.storm.metrics2.TaskMetrics`), so |
| 220 | +the dashboard is filtered by a chained `topology → host → component → task` |
| 221 | +variable set, and every series carries `topology_id`, `host`, `component`, |
| 222 | +`task`, `port` labels. |
| 223 | + |
| 224 | +`graphite-mapping.yml` models `TaskMetrics` explicitly into clean metrics. Each |
| 225 | +is per `(component, task)`; the `key` label is the metric key — the **own output |
| 226 | +stream** for emit/transfer, or the **`sourceComponent:sourceStream`** for the |
| 227 | +input metrics (execute/ack/fail/latency): |
| 228 | + |
| 229 | +| Prometheus metric | TaskMetrics source | type | |
| 230 | +|---|---|---| |
| 231 | +| `storm_emit_rate` / `storm_emit_total` | `__emit-count` (`.m1_rate` / `.count`) | RateCounter | |
| 232 | +| `storm_transfer_rate` / `storm_transfer_total` | `__transfer-count` | RateCounter | |
| 233 | +| `storm_execute_rate` / `storm_execute_total` | `__execute-count` | RateCounter | |
| 234 | +| `storm_ack_rate` / `storm_ack_total` | `__ack-count` | RateCounter | |
| 235 | +| `storm_fail_rate` / `storm_fail_total` | `__fail-count` | RateCounter | |
| 236 | +| `storm_execute_latency_ms` | `__execute-latency` | RollingAverageGauge (ms) | |
| 237 | +| `storm_process_latency_ms` | `__process-latency` | RollingAverageGauge (ms) | |
| 238 | +| `storm_complete_latency_ms` | `__complete-latency` (spout) | RollingAverageGauge (ms) | |
| 239 | +| `storm_execute_jitter_ms` | `__execute-jitter` | EwmaGauge (ms) | |
| 240 | +| `storm_process_jitter_ms` | `__process-jitter` | EwmaGauge (ms) | |
| 241 | +| `storm_complete_jitter_ms` | `__complete-jitter` (spout) | EwmaGauge (ms) | |
| 242 | +| `storm_capacity` | `__capacity` (over all streams) | RollingAverageGauge (0..1) | |
| 243 | + |
| 244 | +Counts/rates are **sampling-scaled** (`topology.stats.sample.rate`), so they |
| 245 | +estimate true values; `.m1_rate` is tuples/s averaged over 1 minute. The |
| 246 | +**jitter** metrics are RFC 3550 EWMA latency-variation estimators and only flow |
| 247 | +when `topology.stats.ewma.enable: true` (set in `storm.yaml`) — pair them with |
| 248 | +`netsim.sh` to see network jitter propagate into per-task latency variation. |
| 249 | + |
| 250 | +Everything else falls through to generic series, still fully queryable: |
| 251 | +- `storm_worker{metric=...}` — `__skipped-*`, `__backpressure-last-overflow-count`, |
| 252 | + `__send-iconnection-*`, `doHeartbeat-calls`. |
| 253 | +- `storm_topology{component="__system"}` — per-worker JVM (`task=-1`): |
| 254 | + `memory.heap.*`, `memory.non-heap.*`, `memory.pools.*`, `GC.*.{count,time}`, |
| 255 | + `threads.*`. |
| 256 | + |
| 257 | +List everything currently flowing: |
| 258 | + |
| 259 | +```bash |
| 260 | +curl -s http://localhost:9090/api/v1/label/metric/values | python3 -m json.tool |
| 261 | +``` |
| 262 | + |
| 263 | +## Notes |
| 264 | + |
| 265 | +- The bundled `storm dev-zookeeper` is single-node and for development only; it |
| 266 | + does not snapshot. Swap in a real ZooKeeper for anything beyond local testing. |
| 267 | +- Heaps are kept small in `storm.yaml` so the whole cluster fits on a laptop. |
| 268 | + Bump `worker.childopts` / `*.childopts` for heavier topologies. |
| 269 | +- To run a different topology, mount its jar into the `nimbus` service (see the |
| 270 | + `volumes:` of that service) and `storm jar` it the same way. |
0 commit comments