Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ So far we have:
- Store Bluebikes station status data every 5 min.
- Store ridership data
- Process and store speed restrictions
- Store hourly Boston weather data (temperature + condition) for chart overlays

To add a new lambda function, put the methods you need in a new file in chalicelib/.
Then add your trigger in app.py.
Expand Down
10 changes: 10 additions & 0 deletions ingestor/.chalice/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@
"iam_policy_file": "policy-service-ridership-dashboard.json",
"lambda_timeout": 900,
"lambda_memory_size": 1024
},
"store_hourly_weather": {
"iam_policy_file": "policy-weather.json",
"lambda_memory_size": 192,
"lambda_timeout": 30
},
"backfill_weather": {
"iam_policy_file": "policy-weather.json",
"lambda_memory_size": 256,
"lambda_timeout": 900
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions ingestor/.chalice/policy-weather.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "arn:*:logs:*:*:*"
},
{
"Action": "s3:ListBucket",
"Effect": "Allow",
"Resource": ["arn:aws:s3:::tm-mbta-performance"]
},
{
"Action": "s3:*",
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::tm-mbta-performance/Weather",
"arn:aws:s3:::tm-mbta-performance/Weather/*",
"arn:aws:s3:::tm-mbta-performance/Weather/hourly/*"
]
}
]
}
44 changes: 44 additions & 0 deletions ingestor/.chalice/resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,50 @@
"Properties": {
"BucketName": "tm-shuttle-positions"
}
},
"StoreHourlyWeather": {
"Type": "AWS::Serverless::Function",
"Properties": {
"Description": "Stores hourly Boston weather data from Open-Meteo for chart overlays",
"Environment": {
"Variables": {
"DD_API_KEY": {
"Ref": "DDApiKey"
},
"DD_VERSION": {
"Ref": "GitVersion"
},
"DD_TAGS": {
"Ref": "DDTags"
},
"DD_GIT_REPOSITORY_URL": {
"Ref": "DDGitRepositoryUrl"
}
}
}
}
},
"BackfillWeather": {
"Type": "AWS::Serverless::Function",
"Properties": {
"Description": "Manually-triggered backfill of historical hourly weather from Open-Meteo archive",
"Environment": {
"Variables": {
"DD_API_KEY": {
"Ref": "DDApiKey"
},
"DD_VERSION": {
"Ref": "GitVersion"
},
"DD_TAGS": {
"Ref": "DDTags"
},
"DD_GIT_REPOSITORY_URL": {
"Ref": "DDGitRepositoryUrl"
}
}
}
}
}
}
}
16 changes: 16 additions & 0 deletions ingestor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
service_ridership_dashboard,
speed_restrictions,
trip_metrics,
weather,
)
from datadog_lambda.wrapper import datadog_lambda_wrapper

Expand Down Expand Up @@ -177,3 +178,18 @@ def store_landing_data(event):
@app.schedule(Cron(30, 9, "*", "*", "?", "*"))
def update_service_ridership_dashboard(event):
service_ridership_dashboard.create_service_ridership_dash_json()


#################
# STORE HOURLY WEATHER
# Every hour at :05 — fetch latest Boston hourly weather and merge into today's S3 file.
@app.schedule(Cron(5, "*", "*", "*", "?", "*"))
def store_hourly_weather(event):
weather.ingest_hourly_weather()


# Manually triggered lambda for backfilling historical weather from Open-Meteo's archive API.
# Payload: {"start_date": "YYYY-MM-DD", "end_date": "YYYY-MM-DD"}
@app.lambda_function()
def backfill_weather(params, context):
weather.backfill_weather(params["start_date"], params["end_date"])
82 changes: 82 additions & 0 deletions ingestor/chalicelib/tests/test_weather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from ..weather.constants import WEATHER_CODE_TO_CONDITION, key
from ..weather.ingest import _parse_hourly


HOURLY_FIXTURE = {
"time": [
"2026-04-22T00:00",
"2026-04-22T01:00",
"2026-04-23T00:00",
],
"temperature_2m": [45.23, 44.1, 50.0],
"weather_code": [0, 61, 75],
"precipitation": [0.0, 0.05, 0.0],
"relative_humidity_2m": [78, 82, 60],
"wind_speed_10m": [8.47, 9.2, 5.0],
}


def test_parse_hourly_groups_by_date():
by_date = _parse_hourly(HOURLY_FIXTURE)
assert set(by_date.keys()) == {"2026-04-22", "2026-04-23"}
assert set(by_date["2026-04-22"].keys()) == {"2026-04-22T00:00", "2026-04-22T01:00"}


def test_parse_hourly_record_shape():
record = _parse_hourly(HOURLY_FIXTURE)["2026-04-22"]["2026-04-22T00:00"]
assert record["temperature_f"] == 45.2
assert record["weather_code"] == 0
assert record["condition"] == "clear"
assert record["precipitation_in"] == 0.0
assert record["humidity_pct"] == 78
assert record["wind_mph"] == 8.5


def test_weather_code_mapping():
assert WEATHER_CODE_TO_CONDITION[0] == "clear"
assert WEATHER_CODE_TO_CONDITION[3] == "cloudy"
assert WEATHER_CODE_TO_CONDITION[45] == "fog"
assert WEATHER_CODE_TO_CONDITION[63] == "rain"
assert WEATHER_CODE_TO_CONDITION[75] == "snow"
assert WEATHER_CODE_TO_CONDITION[95] == "storm"


def test_unknown_code_falls_back():
fixture = {
"time": ["2026-04-22T00:00"],
"temperature_2m": [40.0],
"weather_code": [999],
"precipitation": [0.0],
"relative_humidity_2m": [50],
"wind_speed_10m": [0.0],
}
record = _parse_hourly(fixture)["2026-04-22"]["2026-04-22T00:00"]
assert record["condition"] == "unknown"


def test_merge_preserves_other_hours_and_overwrites_same_hour():
existing = {
"2026-04-22T00:00": {"temperature_f": 40.0, "condition": "clear"},
"2026-04-22T01:00": {"temperature_f": 39.0, "condition": "clear"},
}
new = _parse_hourly(
{
"time": ["2026-04-22T01:00", "2026-04-22T02:00"],
"temperature_2m": [41.0, 42.0],
"weather_code": [61, 2],
"precipitation": [0.1, 0.0],
"relative_humidity_2m": [80, 75],
"wind_speed_10m": [7.0, 6.0],
}
)["2026-04-22"]

existing.update(new)

assert existing["2026-04-22T00:00"]["temperature_f"] == 40.0
assert existing["2026-04-22T01:00"]["condition"] == "rain"
assert existing["2026-04-22T01:00"]["temperature_f"] == 41.0
assert existing["2026-04-22T02:00"]["condition"] == "cloudy"


def test_key_format():
assert key("2026-04-22") == "Weather/hourly/2026-04-22.json.gz"
3 changes: 3 additions & 0 deletions ingestor/chalicelib/weather/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from chalicelib.weather.ingest import backfill_weather, ingest_hourly_weather

__all__ = ["backfill_weather", "ingest_hourly_weather"]
48 changes: 48 additions & 0 deletions ingestor/chalicelib/weather/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
BUCKET = "tm-mbta-performance"

# Downtown Boston (near City Hall)
LATITUDE = 42.3601
LONGITUDE = -71.0589

FORECAST_URL = "https://api.open-meteo.com/v1/forecast"
ARCHIVE_URL = "https://archive-api.open-meteo.com/v1/archive"

HOURLY_FIELDS = "temperature_2m,weather_code,precipitation,relative_humidity_2m,wind_speed_10m"

# WMO weather codes → coarse condition labels the frontend can rely on.
# https://open-meteo.com/en/docs — "Weather variable documentation"
WEATHER_CODE_TO_CONDITION = {
0: "clear", # Clear sky
1: "clear", # Mainly clear
2: "cloudy", # Partly cloudy
3: "cloudy", # Overcast
45: "fog", # Fog
48: "fog", # Depositing rime fog
51: "rain", # Light drizzle
53: "rain", # Moderate drizzle
55: "rain", # Dense drizzle
56: "rain", # Light freezing drizzle
57: "rain", # Dense freezing drizzle
61: "rain", # Slight rain
63: "rain", # Moderate rain
65: "rain", # Heavy rain
66: "rain", # Light freezing rain
67: "rain", # Heavy freezing rain
71: "snow", # Slight snow fall
73: "snow", # Moderate snow fall
75: "snow", # Heavy snow fall
77: "snow", # Snow grains
80: "rain", # Slight rain showers
81: "rain", # Moderate rain showers
82: "rain", # Violent rain showers
85: "snow", # Slight snow showers
86: "snow", # Heavy snow showers
95: "storm", # Thunderstorm (slight or moderate)
96: "storm", # Thunderstorm with slight hail
99: "storm", # Thunderstorm with heavy hail
}
# Thunderstorm forecast with hail is only available in Central Europe


def key(day):
return f"Weather/hourly/{str(day)}.json.gz"
Loading
Loading