Skip to content

WSM-DDIA/Weather-Stations-Monitoring

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

123 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Weather Stations Monitoring

Design Data-Intensive Apps Course Project

Project Description

The Internet of Things (IoT) is an important source of data streams in the modern digital world.
The “Things” are huge in count and emit messages in very high frequency which flood the
global internet. Hence, efficient stream processing is inevitable.

One use case is the distributed weather stations use case. Each “weather station” emits
readings for the current weather status to the “central base station” for persistence and
analysis. In this project, you will find the implementation of the architecture of a weather
monitoring system.

Authors

Ahmed Aboeleid

Mohamed Salama

Youssef Bazina

Table of Content

Setup

Note This setup for Ubuntu

Local Setup

  1. Run Kafka Commands Respectively

    Service Command
    Zoo Keeper bin/zookeeper-server-start.sh config/zookeeper.properties
    Kafka Server bin/kafka-server-start.sh config/server.properties
    Create Weather Status Messages Topic bin/kafka-topics.sh --create --topic weather-status-messages --bootstrap-server localhost:9092
    Create Raining Status Messages Topic bin/kafka-topics.sh --create --topic raining-status-messages --bootstrap-server localhost:9092
    Run Kafka Consumer bin/kafka-console-consumer.sh --topic weather-status-messages --from-beginning --bootstrap-server localhost:9092
  2. Run Bitcask.

  3. Run Central Station.

  4. Run Weather Station. Run multiple instances with arguments station_id latitude longitude to simulate multiple stations, e.g. 1 30.0444 31.2357.

  5. Run Kafka Processor.

Now you can see the messages in each terminal.

K8s Setup

Cluster used: kind (named desktop). Requires Docker, kubectl, kind installed locally.

One-shot deploy (recommended):

./scripts/deploy.sh           # full bring-up
./scripts/deploy.sh --wipe    # wipe PVCs + ES index, then deploy
./scripts/deploy.sh --teardown

The script does, in order:

  1. Builds Docker images for bitcask, central-station, weather-station, kafka-processor and loads them into the kind cluster (kind load docker-image).
  2. Applies k8s/kafka.yaml — single-broker KRaft Kafka. Topics auto-create on first produce (KAFKA_AUTO_CREATE_TOPICS_ENABLE=true).
  3. Applies k8s/elk-stack.yaml — Elasticsearch + Kibana (nshou/elasticsearch-kibana).
  4. Waits for ES, parses the auto-generated elastic password from pod logs, writes/updates the es-credentials Secret (used by Central Station via secretKeyRef).
  5. Applies bitcask/bitcask.yaml — Bitcask StatefulSet + headless service.
  6. Applies kafkaProcessor/kafka-processor.yaml — Kafka Streams processor.
  7. Applies centralstation/central-station.yaml — Central Station Deployment + parquet PVC.
  8. Applies weatherStation/stations.yaml — Weather Station StatefulSet (10 replicas).

Manual order (if you skip the script):

kubectl apply -f k8s/kafka.yaml
kubectl rollout status statefulset/kafka

kubectl apply -f k8s/elk-stack.yaml
kubectl wait --for=condition=Ready pod -l app=elk-stack
ES_PWD=$(./scripts/get-es-password.sh)
kubectl create secret generic es-credentials \
    --from-literal=username=elastic \
    --from-literal=password="$ES_PWD" \
    --dry-run=client -o yaml | kubectl apply -f -

kubectl apply -f bitcask/bitcask.yaml
kubectl apply -f kafkaProcessor/kafka-processor.yaml
kubectl apply -f centralstation/central-station.yaml
kubectl apply -f weatherStation/stations.yaml

Access:

kubectl port-forward svc/kibana 5601:5601          # Kibana UI
kubectl port-forward svc/elasticsearch 9200:9200   # ES API

Login as elastic with the password from ./scripts/get-es-password.sh.

Configuration

All tunables exposed as env vars on the K8s manifests.

Kafka — k8s/kafka.yaml

Env Default Purpose
KAFKA_PROCESS_ROLES broker,controller KRaft combined mode
KAFKA_AUTO_CREATE_TOPICS_ENABLE true Convenience for dev
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR 1 Single-broker dev cluster
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR 1 Single-broker dev cluster
KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR 1 Single-broker dev cluster

These three replication-factor knobs are mandatory for a 1-broker cluster — otherwise the __consumer_offsets / __transaction_state topics fail to auto-create on first consumer join, and every consumer hangs in a join/leave loop.

Env Default Purpose
BITCASK_PORT 9090 Server TCP port
BITCASK_MEMORY_LIMIT 4096 Active-file rollover threshold in bytes; smaller value = more frequent rotation + compaction
Env Default Purpose
KAFKA_BROKER kafka-service:9092 Bootstrap servers
KAFKA_TOPIC weather-status-messages Source topic
BITCASK_IP / BITCASK_PORT bitcask-0.bitcask-service:9090 Bitcask server
BITCASK_DIRECTORY /data/bitcask Bitcask DB path
ES_HOST / ES_PORT / ES_SCHEME elasticsearch:9200/https ES endpoint
ES_USER / ES_PASSWORD from es-credentials secret ES auth
STATION_COUNT 10 Number of stations for reader to scan
READER_INITIAL_DELAY_SEC 30 Delay before first Bitcask read
READER_PERIOD_SEC 30 Bitcask read period
PARQUET_BATCH_SIZE 100 Records per parquet file (controls flush + ES upload cadence)

Logging

Both modules ship simplelogger.properties in src/main/resources with defaultLogLevel=info and per-package overrides (bitCask=debug, centralstation=debug). To override at runtime without rebuilding, set JAVA_TOOL_OPTIONS on the container, e.g.:

- name: JAVA_TOOL_OPTIONS
  value: "-Dorg.slf4j.simpleLogger.defaultLogLevel=debug"

The deployment's command: overrides the Dockerfile ENTRYPOINT, which strips any -D flags baked into the image — JAVA_TOOL_OPTIONS is picked up by the JVM regardless.

Operations

Topics

kubectl exec kafka-0 -- /opt/kafka/bin/kafka-topics.sh \
    --bootstrap-server kafka-service:9092 --list
kubectl exec kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka-service:9092 \
    --topic weather-status-messages --from-beginning --max-messages 5

Bitcask folder

kubectl exec bitcask-0 -- ls -lah /data/bitcask
kubectl cp bitcask-0:/data/bitcask ./bitcask-snapshot
kubectl logs -f bitcask-0 | grep -E "compaction|rename"

Active file (<timestamp>.bitcask.data) rotates when it exceeds BITCASK_MEMORY_LIMIT. Older replicas are merged into a single compacted file (<timestamp>.bitcask.datam → renamed by the scheduled compactor; the hint file is rebuilt alongside it).

Parquet folder

kubectl exec deploy/central-station -- ls -R /app/Parquet_Files_Directory

Path layout: Parquet_Files_Directory/{YYYY}/{MM}/{DD}/Station_{id}/Version_{n}_{startMillis}.parquet. The file is finalised (renamed to add the .parquet suffix) when its row count reaches PARQUET_BATCH_SIZE; the indexer picks it up immediately via the listener callback.

Logs

kubectl logs -f deploy/central-station
kubectl logs -f deploy/processor
kubectl logs -f statefulset/bitcask
kubectl logs -f weatherstation-0

Wipe data without redeploying

./scripts/deploy.sh --wipe   # nukes PVCs + ES index, then redeploys

Kibana Visualisations

Open Kibana → Stack Management → Data Views → Create data view:

  • Name: weather-status
  • Index pattern: weather-status
  • Timestamp field: status_timestamp

1. Low-battery count per station

Visualize Library → Create → Lens, data view weather-status:

  • Horizontal axis: station_id (Top values, size 10)
  • Vertical axis: Count of records
  • Top filter (KQL): battery_status: "low"
  • Chart: Bar vertical stacked
  • Save as Low-battery count per station

2. Dropped messages per station

The Weather Station producer drops every 10th attempt deterministically (DROP_EVERY_N = 10 in WeatherStationProducer.java). The s_no published to Kafka is gapless (only incremented on send), so drop count is computed against attempts via the producer log:

kubectl logs weatherstation-0 | grep -c "Dropped message"
kubectl logs weatherstation-0 | grep -c "Sent:"

Inside Kibana, an approximate "dropped" metric per station is:

Lens formula on weather-status:  max(s_no) - count()

Note: with the gapless producer this measures records not yet flushed to ES (waiting in the active parquet batch). To track actual drops, scrape the producer logs above, or extend the payload with an attempt_no field.

Result screenshots

Battery status distribution per station — confirms the spec 30% low / 40% medium / 30% high distribution.

Battery distribution per station

Dropped messages per station(max(s_no) - min(s_no) + 1 - count()) ≈ 9.91% gap, confirming the 10% producer drop rate.

Dropped messages per station

Sanity check via Dev Tools

GET weather-status/_search
{
  "size": 0,
  "aggs": {
    "by_station": {
      "terms": { "field": "station_id" },
      "aggs": {
        "low_batt": { "filter": { "term": { "battery_status": "low" } } },
        "max_sno":  { "max": { "field": "s_no" } },
        "total":    { "value_count": { "field": "s_no" } }
      }
    }
  }
}

low_batt.doc_count / total should sit around 0.30 (per the spec 30/40/30 battery distribution).

System Architecture

System Architecture Diagram

Data Acquisition

Multiple Weather Stations which feed a message queueing service Kafka with their readings.

Weather Stations

  • Implemented in Weather Station
  • Weather station gets its data from Open-Meteo API according to a latitude and longitude the API.
  • Data is fetched every 1 second.
  • The properties of this data is battery distribution(30% low - 40% medium - 30% high) and dropping percentage of 10%.
  • Built using Adapter Integration Pattern to connect our App to Open-Meteo and to receive data on the needed-form.

Kafka Processor

  • Implemented in Kafka Processor.
  • There are two types of processing following
Processing Type Description
Dropping Messages Processes messages by probabilistic sampling of 10%, then throw some of them away
Raining Areas Processes messages and detects rain when humidity > 70%, then pass new messages to raining topic. Kafka Streams and Filters are used to do this
  • Kafka streams produce messages to Weather Topic And records which have humidity > 70 (Pipe & Filter Patterns) go to Raining Topic.
  • Built using Envelope Wrapper as each message Kafka streams unwraps it and processes it then wraps it again as raining status message.
  • Dropped messages go to Invalid Channel which is a RocksDB.

Data Processing and Archiving

  • Data Processing and Archiving is implemented in Central Station.
  • It consumes messages from Weather Topic and Raining Topic.
  • It then writes them to Parquet Files, Parquet writer aggregates every 10k records and flushes them to the file.
  • Files are partitioned by day and station_id.
  • When the writer shuts down, when it restarts it will create a new file for the same station if it's the same day with new version number.

Base Central Station UML Diagram

Data Indexing

  1. BitCask Storage
  2. Elasticsearch and Kibana

Bitcask Storage

  • Implemented in bitcask With JavaDocs.
  • We implemented the BitCask Riak LSM to maintain an updated store of each station status as discussed in This paper
    • Scheduled Compaction over Replica Files to avoid disrupting active readers.

    • Tombstones for deletions to mark deleted entries, so they are skipped at compaction process.

    • The Entry Structure in active and replica files is as follows

      ENTRY timestamp key size value size key value
      SIZE 8 bytes 4 bytes 4 bytes key size value size
    • The Entry Structure in hint files is as follows

      ENTRY timestamp key size value size value position key
      SIZE 8 bytes 4 bytes 4 bytes 8 bytes key size
    • Crash Recovery Mechanism

      • Create a new in-memory structure called keydir.
      • Reads hint files if found, from start to end, and fill keydir with key value pairs.
      • If hint file is not found for specific timestamp, it reads Active file, from start to end, and fill keydir with key value pairs.
    • Compaction Mechanism

      • Loop on all replica files, read each replica file from start to end, add its key value pairs to hashMap.
      • Loop on keydir, write each key value as entry in a compacted file.
      • Delete replica files.
    • MultiWriter concurrency Mechanism

      • One writer at a time, other writers wait until the lock is released.
      • Multiple readers can read at the same time.
    • No checksums implemented to detect errors.

Bitcask UML Diagram

Elasticsearch and Kibana

  • Implemented in-process inside the Central Station as ElasticsearchIndexer.java.

  • Listener modelStationParquetAggregator invokes a Consumer<File> callback the moment a parquet file is finalized (writer closed + renamed with .parquet suffix). The callback hands the file to the indexer.

  • Async upload — single-thread ExecutorService with bounded queue (capacity 16) + CallerRunsPolicy backpressure. ES latency cannot block the Kafka consume loop. If ES is down, the aggregator keeps writing parquet; flushes resume when ES comes back.

  • Bulk indexingco.elastic.clients:elasticsearch-java 8.12.0. Rows streamed via ParquetReader<Group> + GroupReadSupport, batched at 500 docs per bulk request.

  • Single index weather-status with explicit mapping (created on first flush if absent):

    field ES type
    station_id long
    s_no long
    battery_status keyword
    status_timestamp date (epoch_second)
    humidity integer
    temperature integer
    wind_speed integer
  • Doc ID = {station_id}-{s_no} → idempotent. Replaying a parquet file upserts rather than duplicates.

  • Env varsES_HOST (default localhost), ES_PORT (default 9200).

  • Run ES + Kibana:

    docker run -d -p 9200:9200 -p 5601:5601 nshou/elasticsearch-kibana

    Then in Kibana → Stack Management → Data Views → create weather-status → Discover.

  • Kibana's visualisations confirming Battery status distribution of some stations confirming the battery distribution of stations.

  • Kibana's visualisations calculating the percentage of dropped messages from stations confirming the required percentage 10%.

Enterprise Integration Patterns

This system applies the following EIPs end-to-end:

1. Channel Adapter — Open-Meteo → Weather Station

Real weather readings are pulled from the Open-Meteo API. A dedicated adapter class fetches the data and reshapes it to the system's internal message format, then hands it to every producer. The producer itself stays decoupled from the external API — it only knows its own (latitude, longitude). Implemented in weatherStation.

2. Polling Consumer — Central Station ⇐ Kafka

The Central Station polls the weather-status-messages Kafka topic on a fixed cadence (consumer.poll(Duration)), consumes all weather station messages, and dispatches each one downstream (BitCask + Parquet). Implemented in WeatherStatusPollingConsumer.java.

3. Invalid Message Channel — null/malformed → RocksDB

Any message that fails validation (null fields, malformed payload) is diverted from the main processing pipeline into a separate RocksDB store on disk via the dedicated InvalidMessageChannel.java class. The central station never persists invalid records into the Parquet archive — they sit in their own channel for later inspection. Wired up in CentralStation.java.

4. Aggregator — 10K-record Parquet flush

For each station, the Parquet writer aggregates messages in memory and flushes a new Parquet file every 10,000 records. Files are partitioned by day / station_id. Each station's stream produces its own files, independent of other stations. Implemented in StationParquetAggregator.java.

5. Envelope Wrapper + Pipe and Filter — Kafka Processor

The Kafka Processor unwraps each weather-status-messages envelope, parses the payload, filters on humidity > 70%, then re-wraps matching records as raining-status messages and produces them to the raining-status-messages topic. Classic Pipe and Filter chained on Kafka Streams, with the Envelope Wrapper handling (de)serialization at the boundaries. Implemented in kafkaProcessor.

About

A distributed weather stations monitoring system

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors