Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion ingestor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
# Runs every 15 minutes from either 4 AM -> 1:55AM or 5 AM -> 2:55 AM depending on DST
@app.schedule(Cron("0/15", "0-6,9-23", "*", "*", "?", "*"))
def store_current_alerts(event):
"""Fetches and stores current MBTA V3 alerts to S3."""
alerts.save_v3_alerts()


Expand All @@ -46,27 +47,30 @@ def store_current_alerts(event):
# STORE BLUEBIKES FEED
@app.schedule(Cron("0/5", "*", "*", "*", "?", "*"))
def bb_store_station_status(event):
"""Stores current Bluebikes station status snapshot to S3."""
bluebikes.store_station_status()


# 10am UTC -> 6am EST
@app.schedule(Cron(0, 10, "*", "*", "?", "*"))
def bb_store_station_info(event):
"""Stores daily Bluebikes station info to S3."""
bluebikes.store_station_info()


# 6am UTC -> 2am EDT
@app.schedule(Cron(0, 6, "*", "*", "?", "*"))
def bb_calc_daily_stats(event):
"""Calculates daily Bluebikes rideability statistics for yesterday."""
yesterday = date.today() - timedelta(days=1)
bluebikes.calc_daily_stats(yesterday)


# Runs every 30 minutes from either 4 AM -> 1:55AM or 5 AM -> 2:55 AM depending on DST
@app.schedule(Cron("0/30", "0-6,9-23", "*", "*", "?", "*"))
def update_delivered_trip_metrics(event):
"""Updates daily trip metrics for today (or yesterday before 9 AM UTC)."""
today = datetime.now()
""" Update yesterdays entry until 4/5 am (9 AM UTC)"""
if today.hour < 9:
today = today - timedelta(days=1)
daily_speeds.update_daily_table(today.date())
Expand All @@ -76,6 +80,7 @@ def update_delivered_trip_metrics(event):
# Update weekly and monthly tables. At 2/3 AM EST and also after we have updated yesterday's data.
@app.schedule(Cron(10, "7,12", "*", "*", "?", "*"))
def update_agg_trip_metrics(event):
"""Updates weekly and monthly aggregate trip metric tables."""
agg_speed_tables.update_tables("weekly")
agg_speed_tables.update_tables("monthly")

Expand All @@ -84,6 +89,7 @@ def update_agg_trip_metrics(event):
# The MBTA cleans up their data the next day (we suspect sometime after 4 AM). Update yesterday's data after this (and 2 days ago to be safe).
@app.schedule(Cron(0, 12, "*", "*", "?", "*"))
def update_delivered_trip_metrics_yesterday(event):
"""Re-updates trip metrics for yesterday and two days ago after MBTA data cleanup."""
today = datetime.now()
yesterday = (today - timedelta(days=1)).date()
two_days_ago = (today - timedelta(days=2)).date()
Expand All @@ -94,24 +100,28 @@ def update_delivered_trip_metrics_yesterday(event):
# 7:10am UTC -> 2:10/3:10am ET every day
@app.schedule(Cron(10, 7, "*", "*", "?", "*"))
def update_ridership(event):
"""Ingests the latest ridership data into DynamoDB."""
ridership.ingest_ridership_data()


# 7:20am UTC -> 2:20/3:20am ET every weekday
@app.schedule(Cron(20, 7, "?", "*", "MON-FRI", "*"))
def update_speed_restrictions(event):
"""Updates speed restrictions from ArcGIS data for the last 2 months."""
speed_restrictions.update_speed_restrictions(max_lookback_months=2)


# 7:30am UTC -> 2:30/3:30am ET every day
@app.schedule(Cron(30, 7, "*", "*", "?", "*"))
def update_time_predictions(event):
"""Updates prediction accuracy data from MassDOT ArcGIS."""
predictions.update_predictions()


# 8:00am UTC -> 3:00/4:00am ET and 11:00pm UTC -> 7:00/6:00pm ET every day
@app.schedule(Cron(0, "8,23", "*", "*", "?", "*"))
def update_gtfs(event):
"""Ingests GTFS feeds from the last 3 days into DynamoDB and S3."""
today = datetime.now()
three_days_ago = (today - timedelta(days=3)).date()
gtfs.ingest_gtfs_feeds_to_dynamo_and_s3(date_range=(three_days_ago, today.date()))
Expand All @@ -120,6 +130,7 @@ def update_gtfs(event):
# 4:40am UTC -> 2:40/3:40am ET every day
@app.schedule(Cron(40, 7, "*", "*", "?", "*"))
def update_trip_metrics(event):
"""Ingests trip metrics for the last 7 days."""
trip_metrics.ingest_recent_trip_metrics(lookback_days=7)


Expand All @@ -128,6 +139,7 @@ def update_trip_metrics(event):
# Runs 15 minutes after the daily update
@app.schedule(Cron(45, 8, "?", "*", "MON,TUE", "*"))
def update_weekly_alert_delays(event):
"""Aggregates daily delay data into weekly totals for the last 15 days."""
today = datetime.now()
one_week_ago = (today - timedelta(days=15)).date()
delays.update_weekly_from_daily(one_week_ago, today.date())
Expand All @@ -136,6 +148,7 @@ def update_weekly_alert_delays(event):
# for daily delay uploads
@app.schedule(Cron(30, 8, "*", "*", "?", "*"))
def update_daily_alert_delays(event):
"""Processes yesterday's alerts and updates the daily delay table."""
today = datetime.now()
yesterday = (today - timedelta(days=1)).date()
delays.update_table(yesterday, today.date())
Expand All @@ -144,6 +157,12 @@ def update_daily_alert_delays(event):
# Manually triggered lambda for populating daily trip metric tables. Only needs to be ran once.
@app.lambda_function()
def populate_delivered_trip_metrics(params, context):
"""Backfills the DeliveredTripMetrics table for all routes from 2016 to now.

Args:
params: Lambda function parameters (unused).
context: Lambda context object (unused).
"""
start_date = datetime.strptime("2016-01-15", constants.DATE_FORMAT_BACKEND)
end_date = datetime.now()
for route in constants.ALL_ROUTES:
Expand All @@ -153,6 +172,12 @@ def populate_delivered_trip_metrics(params, context):
# Manually triggered lambda for populating monthly or weekly tables. Only needs to be ran once.
@app.lambda_function()
def populate_agg_delivered_trip_metrics(params, context):
"""Backfills monthly and weekly aggregate trip metric tables for all lines.

Args:
params: Lambda function parameters (unused).
context: Lambda context object (unused).
"""
for line in constants.LINES:
print(f"Populating monthly and weekly aggregate trip metrics for {line}")
agg_speed_tables.populate_table(line, "monthly")
Expand All @@ -164,6 +189,7 @@ def populate_agg_delivered_trip_metrics(params, context):
# No need to run on weekends
@app.schedule(Cron(0, 9, "?", "*", "MON-FRI", "*"))
def store_landing_data(event):
"""Uploads trip metrics and ridership data for the landing page to S3."""
print(
f"Uploading ridership and trip metric data for landing page from {constants.NINETY_DAYS_AGO_STRING} to {constants.ONE_WEEK_AGO_STRING}"
)
Expand All @@ -176,4 +202,5 @@ def store_landing_data(event):
# 9:00 UTC -> 4:30/5:30am ET every day (after GTFS and ridership have been ingested)
@app.schedule(Cron(30, 9, "*", "*", "?", "*"))
def update_service_ridership_dashboard(event):
"""Generates and uploads the service ridership dashboard JSON."""
service_ridership_dashboard.create_service_ridership_dash_json()
14 changes: 14 additions & 0 deletions ingestor/chalicelib/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,24 @@


def key(day):
"""Builds the S3 key for a day's alert data.

Args:
day: The service date.

Returns:
The S3 key string for the compressed alerts JSON.
"""
return f"Alerts/v3/{str(day)}.json.gz"


def save_v3_alerts():
"""Fetches current MBTA V3 alerts and appends them to today's alert file in S3.

Downloads the existing alert data for the current service date (if any),
merges in newly fetched alerts by ID, and uploads the updated set back
to S3.
"""
r_s = requests.get("https://api-v3.mbta.com/alerts")
alerts = r_s.json()

Expand Down
97 changes: 94 additions & 3 deletions ingestor/chalicelib/bluebikes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,24 @@
#################
# STATION STATUS TO BE PULLED FROM FEED EVERY 5 MINUTES
def get_station_status_key(date, timestamp):
"""Builds the S3 key for a station status snapshot.

Args:
date: The date of the snapshot.
timestamp: The epoch timestamp of the snapshot.

Returns:
The S3 key string.
"""
return f"station_status/{date}/{timestamp}/bluebikes.csv"


def get_station_status():
"""Fetches the current Bluebikes station status from the GBFS feed.

Returns:
The parsed JSON response containing station status data.
"""
try:
resp = requests.get("https://gbfs.bluebikes.com/gbfs/en/station_status.json", timeout=15)
resp.raise_for_status()
Expand All @@ -32,6 +46,7 @@ def get_station_status():


def store_station_status():
"""Fetches current Bluebikes station status and uploads it to S3 as CSV."""
datajson = get_station_status()

timestamp = datajson.get("last_updated")
Expand All @@ -51,10 +66,19 @@ def store_station_status():
##################
# STATION INFO TO BE PULLED FROM FEED DAILY AT 6AM
def get_station_info_key(date):
"""Builds the S3 key for a day's station info file.

Args:
date: The date of the station info snapshot.

Returns:
The S3 key string.
"""
return f"station_info/{date}/station_info.csv"


def store_station_info():
"""Fetches Bluebikes station and region info and uploads merged data to S3."""
# get station info
resp = requests.get("https://gbfs.bluebikes.com/gbfs/en/station_information.json")

Expand All @@ -80,16 +104,35 @@ def store_station_info():
# Neighbor stations and ridability calculated daily
##################
def get_distance(df, lat, lon, n_lat, n_lon):
"""Calculates geodesic distance between two points in a DataFrame row.

Args:
df: A DataFrame row (or Series) containing coordinate columns.
lat: Column name for the first point's latitude.
lon: Column name for the first point's longitude.
n_lat: Column name for the neighbor's latitude.
n_lon: Column name for the neighbor's longitude.

Returns:
The distance in kilometers between the two points.
"""
loc = (df[lat], df[lon])
n_loc = (df[n_lat], df[n_lon])
dist = distance.distance(loc, n_loc).km
return dist


def haversine(lat, lon, n_lat, n_lon):
"""
Distance function impl. from StackOverflow compatible with numpy arrays.
Distances are slightly different than geopy, so perhaps we use 405 meters as cutoff to be kind?
"""Calculates haversine distance compatible with numpy arrays.

Args:
lat: Latitude(s) of the first point(s).
lon: Longitude(s) of the first point(s).
n_lat: Latitude(s) of the second point(s).
n_lon: Longitude(s) of the second point(s).

Returns:
Distance(s) in kilometers between the point pairs.
"""
radius = 6371.0
d_lat = np.radians(lat - n_lat)
Expand All @@ -101,10 +144,30 @@ def haversine(lat, lon, n_lat, n_lon):


def get_neighbor_key(date):
"""Builds the S3 key for a day's station neighbors file.

Args:
date: The date.

Returns:
The S3 key string.
"""
return f"station_info/{date}/station_neighbors.csv"


def calc_neighbors(date, exclude=[]):
"""Calculates neighboring stations within walking distance and uploads to S3.

Finds all station pairs within 400m, plus the nearest neighbor within
600m for each station, and writes the result to S3.

Args:
date: The date to use for fetching station info.
exclude: A list of station IDs to exclude (e.g. uninstalled stations).

Returns:
A DataFrame of station neighbor pairs with distances.
"""
# get station info
df = s3.download_csv_as_df(BUCKET, get_station_info_key(date))

Expand Down Expand Up @@ -137,10 +200,26 @@ def calc_neighbors(date, exclude=[]):


def get_rideability_key(date):
"""Builds the S3 key for a day's rideability data.

Args:
date: The date.

Returns:
The S3 key string.
"""
return f"rideability/{date}/rideability.csv"


def gather_single_day_data(single_day):
"""Downloads and concatenates all station status snapshots for a single day.

Args:
single_day: The date to gather data for.

Returns:
A DataFrame containing all station status observations for the day.
"""
keys = s3.ls(BUCKET, f"station_status/{single_day}")
dfs = [s3.download_csv_as_df(BUCKET, key) for key in keys]
df = pd.concat(dfs)
Expand All @@ -150,6 +229,18 @@ def gather_single_day_data(single_day):

# TODO: edge case with valet
def calc_daily_stats(day):
"""Calculates daily rideability statistics for all Bluebikes stations.

Determines what percentage of observations each station was "rideable"
(between 10% and 85% full and active) during operating hours (6 AM - 10 PM),
accounting for neighboring station availability.

Args:
day: The date to calculate stats for.

Returns:
A DataFrame with per-station rideability scores for the day.
"""
df = gather_single_day_data(day)

# find uninstalled stations to exclude from neighbor calculation
Expand Down
Loading