Skip to content

Commit 74e54e1

Browse files
authored
feat: better replication log metrics (#2394)
1 parent dcd8a9f commit 74e54e1

File tree

4 files changed

+25
-11
lines changed

4 files changed

+25
-11
lines changed

.changeset/angry-seahorses-turn.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
feat: add receive/replication lag to exposed metrics

packages/sync-service/lib/electric/postgres/replication_client.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ defmodule Electric.Postgres.ReplicationClient do
269269
[:electric, :postgres, :replication, :transaction_received],
270270
%{
271271
monotonic_time: System.monotonic_time(),
272+
receive_lag: DateTime.diff(DateTime.utc_now(), txn.commit_timestamp, :millisecond),
272273
bytes: byte_size(data),
273274
count: 1,
274275
operations: txn.num_changes

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -366,23 +366,25 @@ defmodule Electric.Shapes.Consumer do
366366
actual_num_changes: num_changes
367367
})
368368

369+
shape_status.set_latest_offset(shape_status_state, shape_handle, last_log_offset)
370+
371+
notify_listeners(registry, :new_changes, shape_handle, last_log_offset)
372+
373+
lag = calculate_replication_lag(txn)
374+
OpenTelemetry.add_span_attributes(replication_lag: lag)
375+
369376
:telemetry.execute(
370377
[:electric, :storage, :transaction_stored],
371378
%{
372379
duration: System.monotonic_time() - timestamp,
373380
bytes: new_log_state.current_txn_bytes,
374381
count: 1,
375-
operations: num_changes
382+
operations: num_changes,
383+
replication_lag: lag
376384
},
377385
Map.new(shape_attrs(state.shape_handle, state.shape))
378386
)
379387

380-
shape_status.set_latest_offset(shape_status_state, shape_handle, last_log_offset)
381-
382-
notify_listeners(registry, :new_changes, shape_handle, last_log_offset)
383-
384-
report_replication_lag(txn)
385-
386388
{:cont, notify(txn, %{state | log_state: new_log_state})}
387389

388390
true ->
@@ -543,14 +545,12 @@ defmodule Electric.Shapes.Consumer do
543545
]
544546
end
545547

546-
defp report_replication_lag(%Transaction{commit_timestamp: commit_timestamp}) do
548+
defp calculate_replication_lag(%Transaction{commit_timestamp: commit_timestamp}) do
547549
# Compute time elapsed since commit
548550
# since we are comparing PG's clock with our own
549551
# there may be a slight skew so we make sure not to report negative lag.
550552
# Since the lag is only useful when it becomes significant, a slight skew doesn't matter.
551553
now = DateTime.utc_now()
552-
lag = Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond))
553-
554-
OpenTelemetry.add_span_attributes(replication_lag: lag)
554+
Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond))
555555
end
556556
end

packages/sync-service/lib/electric/telemetry/stack_telemetry.ex

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,14 @@ with_telemetry [OtelMetricExporter, Telemetry.Metrics] do
238238
distribution("electric.storage.make_new_snapshot.stop.duration",
239239
unit: {:native, :millisecond},
240240
keep: for_stack
241+
),
242+
distribution("electric.postgres.replication.transaction_received.receive_lag",
243+
unit: :millisecond,
244+
keep: for_stack
245+
),
246+
distribution("electric.storage.transaction_stored.replication_lag",
247+
unit: :millisecond,
248+
keep: for_stack
241249
)
242250
] ++ prometheus_metrics(opts)
243251
end

0 commit comments

Comments
 (0)