feat: Add per-stream sync progress tracking with Rich Live status display#1011
feat: Add per-stream sync progress tracking with Rich Live status display#1011Aaron ("AJ") Steers (aaronsteers) wants to merge 20 commits intomainfrom
Conversation
…sync_status Add with_stream_progress boolean parameter to get_cloud_sync_status MCP tool. When enabled, fetches current connection state and catalog to compute estimated progress percentages for streams with datetime-based cursors. Core progress calculation logic lives in airbyte/cloud/_sync_progress.py (public module) with the MCP tool as a thin wrapper. Progress formula: (current_cursor - sync_start) / (now - sync_start) Only datetime-based cursors are supported; others are skipped gracefully.
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1775250923-sync-progress-tracking' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1775250923-sync-progress-tracking'PR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful ResourcesCommunity SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Would you like a short usage example or a unit-test checklist for the new progress computation, wdyt? 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (4)
airbyte/mcp/cloud.py (1)
699-707: Consider adding a try/except for graceful degradation if progress computation fails?If
dump_raw_state(),dump_raw_catalog(), orcompute_stream_progress()throws an unexpected error (e.g., malformed API response, network issue), the entireget_cloud_sync_statuscall will fail even though the core sync status info was already retrieved successfully.Would it make sense to wrap this block so that progress computation failures don't break the main response? Something like:
💡 Graceful degradation option
if with_stream_progress: + try: state_data = connection.dump_raw_state() catalog_data = connection.dump_raw_catalog() result["stream_progress"] = compute_stream_progress( state_data=state_data, catalog_data=catalog_data, sync_start_time=sync_result.start_time, ) + except Exception as e: + result["stream_progress"] = None + result["stream_progress_error"] = str(e)This way callers still get the main status info even if progress estimation hits an edge case. Totally optional though - fail-fast might be preferable for debugging. wdyt?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/mcp/cloud.py` around lines 699 - 707, Wrap the progress computation in get_cloud_sync_status with a try/except so failures in dump_raw_state(), dump_raw_catalog(), or compute_stream_progress() don't abort the whole call: call connection.dump_raw_state() / connection.dump_raw_catalog() and compute_stream_progress(...) inside a try block, on exception catch and log the error (including exception details) and either set result["stream_progress"] = None or skip the key, then continue returning the main sync status; reference the functions dump_raw_state, dump_raw_catalog, compute_stream_progress and the surrounding get_cloud_sync_status logic when making the change.airbyte/cloud/_sync_progress.py (3)
131-136: The single-value datetime heuristic might benefit from debug logging.This "last resort" fallback picks a cursor value only when exactly one string field parses as a datetime. This is clever, but when it kicks in (or fails to find a match), it could be helpful to log which path was taken for debugging purposes.
Would adding a debug log here help with troubleshooting when progress calculations seem off? wdyt?
💡 Optional logging
if len(parsed) == 1: + logger.debug( + "Using single-value datetime heuristic for cursor: %s", + parsed[0][0], + ) return parsed[0][0] return None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cloud/_sync_progress.py` around lines 131 - 136, Add debug logging in the "last resort" single-string datetime heuristic inside _sync_progress.py: when you build string_values and datetime_values (using _try_parse_datetime_cursor) emit a debug log showing string_values and the parsed list; if len(parsed) == 1 log the chosen cursor value and return it, otherwise log that no unique datetime cursor was found and include parsed contents for troubleshooting; place logs adjacent to the existing variables (string_values, datetime_values, parsed) so it's clear which path was taken.
175-176: Consider wrapping pydantic parsing in try/except for clearer error handling?If
state_datadoesn't conform to the expected schema (e.g., missing required fields, wrong types),ConnectionStateResponse(**state_data)will raise a pydanticValidationError. This could produce a confusing traceback for callers.Would wrapping this in a try/except to return an empty result or raise a domain-specific exception improve the debugging experience? Something like:
💡 Possible defensive handling
+ try: state = ConnectionStateResponse(**state_data) + except Exception as e: + logger.warning("Failed to parse connection state: %s", e) + return [] streams: list[StreamState] = _get_stream_list(state)Just a thought - the current behavior of letting the error propagate might be fine if you prefer fail-fast semantics here. wdyt?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cloud/_sync_progress.py` around lines 175 - 176, Wrap the pydantic parsing of state_data in a try/except to handle malformed input: surround the ConnectionStateResponse(**state_data) call in a try block, catch pydantic.errors.ValidationError (imported as ValidationError) and either return an empty streams list (so streams: list[StreamState] = []) or raise a small domain-specific exception (e.g., StateParseError) with a clear message that includes the original ValidationError; ensure the rest of the logic still calls _get_stream_list(state) only on successful parsing.
222-226: Progress formula assumes cursor datetime advances with wall-clock time - worth documenting this assumption?The formula
(cursor_dt - sync_start_time) / (now - sync_start_time)implicitly assumes that cursor timestamps move forward roughly in sync with wall-clock time. This works well for real-time event streams but may produce 0% progress for historical backfills.The clamping to
[0.0, 1.0]handles edge cases gracefully, but users might be confused when they see 0% progress on a sync that's actually making headway through old data. Perhaps adding a note to the function docstring about this limitation would help set expectations? wdyt?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cloud/_sync_progress.py` around lines 222 - 226, Update the function's docstring that computes progress using (cursor_dt - sync_start_time) / (now - sync_start_time) to explicitly state the assumption that cursor_dt advances roughly with wall-clock time and that this approach will under-report progress for historical backfills (potentially yielding 0% despite work being done); mention the clamping to [0.0, 1.0] and its effect on reported values (entry["progress_pct"]) and suggest callers use a different metric for backfills if needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@airbyte/cloud/_sync_progress.py`:
- Around line 8-17: The docstring/formula in airbyte/cloud/_sync_progress.py is
inaccurate: the implementation for computing progress uses wall-clock
sync_start_time (variables sync_start_time, now, cursor_dt) not
sync_start_cursor, so update the function/class docstring that documents the
formula to reflect the actual computation (use (cursor_dt - sync_start_time) /
(now - sync_start_time) in words) and add a short note that this is a wall-clock
heuristic that only yields meaningful progress when source cursor timestamps
roughly track real time (it will clamp to 0% for historical backfills); keep
references to the existing variable names (sync_start_time, sync_start_cursor,
cursor_dt, now) so readers can find the code and understand the limitation.
- Around line 41-43: The docstring for the datetime parsing function
(parse_datetime) is inaccurate: the code only calls datetime.fromisoformat()
(via datetime.fromisoformat in _sync_progress.py) and has no fallback patterns;
update the function's docstring to say it only attempts ISO 8601 parsing and
returns None if parsing fails (or alternatively implement a fallback loop using
datetime.strptime with a curated list of common formats like "%Y-%m-%d
%H:%M:%S", "%Y-%m-%d", "%m/%d/%Y %H:%M:%S" and proper timezone handling if you
prefer adding formats) — keep the change limited to the parse_datetime docstring
or the parsing logic accordingly.
---
Nitpick comments:
In `@airbyte/cloud/_sync_progress.py`:
- Around line 131-136: Add debug logging in the "last resort" single-string
datetime heuristic inside _sync_progress.py: when you build string_values and
datetime_values (using _try_parse_datetime_cursor) emit a debug log showing
string_values and the parsed list; if len(parsed) == 1 log the chosen cursor
value and return it, otherwise log that no unique datetime cursor was found and
include parsed contents for troubleshooting; place logs adjacent to the existing
variables (string_values, datetime_values, parsed) so it's clear which path was
taken.
- Around line 175-176: Wrap the pydantic parsing of state_data in a try/except
to handle malformed input: surround the ConnectionStateResponse(**state_data)
call in a try block, catch pydantic.errors.ValidationError (imported as
ValidationError) and either return an empty streams list (so streams:
list[StreamState] = []) or raise a small domain-specific exception (e.g.,
StateParseError) with a clear message that includes the original
ValidationError; ensure the rest of the logic still calls
_get_stream_list(state) only on successful parsing.
- Around line 222-226: Update the function's docstring that computes progress
using (cursor_dt - sync_start_time) / (now - sync_start_time) to explicitly
state the assumption that cursor_dt advances roughly with wall-clock time and
that this approach will under-report progress for historical backfills
(potentially yielding 0% despite work being done); mention the clamping to [0.0,
1.0] and its effect on reported values (entry["progress_pct"]) and suggest
callers use a different metric for backfills if needed.
In `@airbyte/mcp/cloud.py`:
- Around line 699-707: Wrap the progress computation in get_cloud_sync_status
with a try/except so failures in dump_raw_state(), dump_raw_catalog(), or
compute_stream_progress() don't abort the whole call: call
connection.dump_raw_state() / connection.dump_raw_catalog() and
compute_stream_progress(...) inside a try block, on exception catch and log the
error (including exception details) and either set result["stream_progress"] =
None or skip the key, then continue returning the main sync status; reference
the functions dump_raw_state, dump_raw_catalog, compute_stream_progress and the
surrounding get_cloud_sync_status logic when making the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 04b0b44d-6780-4866-b6d8-ed5245883b36
📒 Files selected for processing (2)
airbyte/cloud/_sync_progress.pyairbyte/mcp/cloud.py
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
airbyte/cloud/_sync_progress.py (1)
142-224: Could we add focused unit tests for the heuristics and edge cases here, wdyt?This function is doing non-trivial parsing/clamping logic; a small test matrix would reduce regression risk.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cloud/_sync_progress.py` around lines 142 - 224, Add focused unit tests for compute_stream_progress to cover its datetime parsing and clamping heuristics: write tests that call compute_stream_progress with controlled sync_start_time and now (use now param) to validate proper handling of timezone-naive and timezone-aware sync_start_time, ensure _try_parse_datetime_cursor is exercised with valid ISO datetimes, non-datetime strings and null cursor values (via _find_cursor_value_in_state or by passing state_data lacking cursor), verify denominator zero/negative cases (now == sync_start_time and now before sync_start_time) produce the "Sync start time is not before current time." reason, and assert clamping at 0.0 and 1.0 for cursors before/after the window; also include a test that uses catalog_data to trigger _extract_cursor_field_from_catalog path. Ensure assertions check stream_name/stream_namespace, cursor_field, cursor_value, progress_pct (rounded to 4 decimals) and reason messages.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@airbyte/cloud/_sync_progress.py`:
- Around line 82-99: The catalog parsing currently assumes cursorField is a list
and returns only the first segment (cursor_field[0]), which drops nested path
information; update the logic that inspects cursor_field (the variable derived
from config.get("cursorField") in the streams loop) to accept both list and
string shapes, preserve the full path by returning the entire list joined into a
single dot-delimited path (or returning the original string if provided), and
add robust type checks (e.g., ensure isinstance(cursor_field, (list, str)) and
that list elements are str) so nested cursors resolve correctly; apply the same
fix to the second occurrence of this logic around the other block (lines
118-123) to keep behavior consistent.
- Around line 169-170: The ConnectionStateResponse(**state_data) construction
can raise and abort progress calculation; wrap the model instantiation and
subsequent call to _get_stream_list in a try/except that catches exceptions from
malformed state_data, log a warning (including the exception) and fall back to
an empty state/streams so opt-in progress does not hard-fail; specifically,
around ConnectionStateResponse and the call to _get_stream_list(state) ensure
that on exception you set streams = [] (and state = None) and continue the
progress path instead of propagating the error.
- Around line 58-61: Normalize ISO timestamps that end with a literal "Z" before
calling datetime.fromisoformat in the parsing code in
airbyte/cloud/_sync_progress.py: detect when the string variable stripped
endswith("Z") and replace that trailing "Z" with "+00:00" (only when it's the
timezone marker) so datetime.fromisoformat(stripped) succeeds on Python 3.10;
keep the existing try/except around datetime.fromisoformat to preserve other
fallback behavior.
---
Nitpick comments:
In `@airbyte/cloud/_sync_progress.py`:
- Around line 142-224: Add focused unit tests for compute_stream_progress to
cover its datetime parsing and clamping heuristics: write tests that call
compute_stream_progress with controlled sync_start_time and now (use now param)
to validate proper handling of timezone-naive and timezone-aware
sync_start_time, ensure _try_parse_datetime_cursor is exercised with valid ISO
datetimes, non-datetime strings and null cursor values (via
_find_cursor_value_in_state or by passing state_data lacking cursor), verify
denominator zero/negative cases (now == sync_start_time and now before
sync_start_time) produce the "Sync start time is not before current time."
reason, and assert clamping at 0.0 and 1.0 for cursors before/after the window;
also include a test that uses catalog_data to trigger
_extract_cursor_field_from_catalog path. Ensure assertions check
stream_name/stream_namespace, cursor_field, cursor_value, progress_pct (rounded
to 4 decimals) and reason messages.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 2aa5fd09-34f1-4ef6-a01f-cec8eb821559
📒 Files selected for processing (1)
airbyte/cloud/_sync_progress.py
…paths, error handling
…TypeError, KeyError)
Runtime Test Results —
|
| Test | Result | Detail |
|---|---|---|
| Opt-in (no flag) | Passed | No stream_progress key when flag omitted |
| Structured data (Kortx) | Passed | 2 streams, all 6 keys present, cursor field = segments.date |
| Quantitative validity (GA4) | Passed* | 55 streams, all progress_pct = 0.0 (valid, see escalation) |
| Cross-connection consistency (GA4) | Passed | Consistent structure across different connection |
Explicit false |
Passed | No stream_progress key when explicitly false |
*Caveat: Could not verify non-zero progress with available test data.
Sample output — GA4 with_stream_progress=true (3 of 55 streams)
[
{"stream_name": "tech_device_category_report", "cursor_field": "date",
"cursor_value": "2025-12-27", "progress_pct": 0.0, "reason": null},
{"stream_name": "demographic_age_report", "cursor_field": "date",
"cursor_value": "2024-04-03", "progress_pct": 0.0, "reason": null},
{"stream_name": "four_weekly_active_users", "cursor_field": "date",
"cursor_value": "2026-01-23", "progress_pct": 0.0, "reason": null}
]Progress = 0.0 because (2025-12-27 - 2026-04-01) / (2026-04-03 - 2026-04-01) = -96d/2d → clamped to 0.0
- Use previous sync's cursor as baseline instead of sync_start_time - Return raw factors: cursor_datetime, previous_cursor_value, previous_cursor_datetime, sync_start_time, target_datetime - Return progress_pct=null with reason for historical backfills - Refactor into helper functions to satisfy complexity limits
There was a problem hiding this comment.
♻️ Duplicate comments (1)
airbyte/cloud/_sync_progress.py (1)
102-106:⚠️ Potential issue | 🟠 MajorCould we harden catalog traversal to fail soft on malformed
catalog_data, wdyt?At Line 103, the loop assumes every
streamsitem is a dict and immediately calls.get(...); malformed payloads can raise and abortcompute_stream_progress()for all streams.Proposed defensive patch
def _extract_cursor_field_from_catalog( catalog: dict[str, Any], stream_name: str, stream_namespace: str | None, ) -> str | None: @@ - streams = catalog.get("streams", []) + streams = catalog.get("streams", []) + if not isinstance(streams, list): + return None for stream_entry in streams: - config = stream_entry.get("config", {}) - stream_info = stream_entry.get("stream", {}) + if not isinstance(stream_entry, dict): + continue + config = stream_entry.get("config", {}) + stream_info = stream_entry.get("stream", {}) + if not isinstance(config, dict) or not isinstance(stream_info, dict): + continue @@ cursor_field = config.get("cursorField") if isinstance(cursor_field, str) and cursor_field: return cursor_field - if isinstance(cursor_field, list) and cursor_field: - return ".".join(str(segment) for segment in cursor_field) + if ( + isinstance(cursor_field, list) + and cursor_field + and all(isinstance(segment, str) and segment for segment in cursor_field) + ): + return ".".join(cursor_field)Also applies to: 116-121
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cloud/_sync_progress.py` around lines 102 - 106, The loop over catalog["streams"] in compute_stream_progress blindly assumes each stream entry is a dict and can cause a crash on malformed catalog_data; update the traversal to first ensure catalog.get("streams") is a list and in the for-loop guard each stream_entry with an isinstance(..., dict) check (or equivalent truthy mapping check) before calling .get on it, skip any non-dict entries (optionally log a warning) and do the same defensive check in the second loop handling stream entries around the block that currently spans lines 116-121 so malformed items are skipped rather than raising.
🧹 Nitpick comments (1)
airbyte/cloud/_sync_progress.py (1)
255-272: Could we add a focused unit-test matrix for baseline-selection branches, wdyt?These branches control the most user-visible semantics (
previous cursoranchor, fallback tosync_start_time, negative numerator, and denominator guard) and are easy to regress without tests.I can draft a compact table-driven test set for these exact cases if you want.
Also applies to: 293-319
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cloud/_sync_progress.py` around lines 255 - 272, Add a focused, table-driven unit test for the baseline-selection and guard branches in the progress computation (covering the case where prev_cursor_dt is used vs when sync_start_time is used as range_start, the denominator <= 0 early-return, and the numerator < 0 path that calls _set_negative_progress_reason). Write tests that call the progress computation function (the function containing variables prev_cursor_dt, sync_start_time, cursor_dt, now and returning entry["progress_pct"] and entry["reason"]) with combinations of prev_cursor_dt present/None, cursor_dt before/after range_start, and now equal to or before range_start to assert expected progress_pct and reason values; add equivalent tests for the sibling block around lines 293-319. Ensure each case is table-driven (inputs + expected outputs) and that the negative-numerator path verifies _set_negative_progress_reason behavior via the returned entry.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@airbyte/cloud/_sync_progress.py`:
- Around line 102-106: The loop over catalog["streams"] in
compute_stream_progress blindly assumes each stream entry is a dict and can
cause a crash on malformed catalog_data; update the traversal to first ensure
catalog.get("streams") is a list and in the for-loop guard each stream_entry
with an isinstance(..., dict) check (or equivalent truthy mapping check) before
calling .get on it, skip any non-dict entries (optionally log a warning) and do
the same defensive check in the second loop handling stream entries around the
block that currently spans lines 116-121 so malformed items are skipped rather
than raising.
---
Nitpick comments:
In `@airbyte/cloud/_sync_progress.py`:
- Around line 255-272: Add a focused, table-driven unit test for the
baseline-selection and guard branches in the progress computation (covering the
case where prev_cursor_dt is used vs when sync_start_time is used as
range_start, the denominator <= 0 early-return, and the numerator < 0 path that
calls _set_negative_progress_reason). Write tests that call the progress
computation function (the function containing variables prev_cursor_dt,
sync_start_time, cursor_dt, now and returning entry["progress_pct"] and
entry["reason"]) with combinations of prev_cursor_dt present/None, cursor_dt
before/after range_start, and now equal to or before range_start to assert
expected progress_pct and reason values; add equivalent tests for the sibling
block around lines 293-319. Ensure each case is table-driven (inputs + expected
outputs) and that the negative-numerator path verifies
_set_negative_progress_reason behavior via the returned entry.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 92745e3c-262a-45c0-8150-39c1c6b0cb88
📒 Files selected for processing (2)
airbyte/cloud/_sync_progress.pyairbyte/mcp/cloud.py
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/mcp/cloud.py
Add get_previous_sync_state() to CloudConnection that retrieves the final committed state from the most recent completed sync job. This provides the baseline cursor values needed for accurate progress calculation. Wire into get_cloud_sync_status MCP tool to pass previous_state_data to compute_stream_progress().
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
airbyte/cloud/connections.py (1)
398-398: Could we make the job scan depth configurable (or page until a succeeded job is found), wdyt?On Line 398,
limit=5is a tight cap and can miss the latest usable succeeded job after a short failure streak, which makes progress baselining less reliable.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cloud/connections.py` at line 398, The hardcoded limit=5 on the get_previous_sync_logs call (variable previous_jobs) can miss a recent succeeded job; make the scan depth configurable or implement paging until a succeeded job is found: add a configurable parameter (e.g., job_scan_depth or max_pages) to the surrounding function or class, replace the fixed limit=5 when calling get_previous_sync_logs, or implement a loop that calls get_previous_sync_logs with increasing offsets/pages until a succeeded job is encountered or the configured max is reached; update callers and any defaults to preserve current behavior if not provided.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@airbyte/cloud/connections.py`:
- Around line 411-421: The current logic returns None as soon as a job has no
attempts or its last attempt's state is falsy, and it rejects empty dicts;
instead, change the loop to continue scanning earlier succeeded jobs when
attempts is empty or when state is missing/None, and accept an empty dict as a
valid state by using isinstance(state, dict) (without also requiring truthiness)
to return it; locate the block that accesses attempts, last_attempt, output, and
state and: (1) remove early returns for missing attempts/state so the
surrounding loop moves to prior jobs, and (2) change the state check to if
isinstance(state, dict): return state; only return None after all candidate jobs
have been examined.
- Around line 405-407: Replace the string conversion and literal comparison
(status = str(job.get_job_status()); if status != "succeeded") with an
enum-based check using the job status enum or the shared FINAL_STATUSES set:
call job.get_job_status() directly and either compare to the succeeded enum
value (e.g., JobStatus.SUCCEEDED) or test membership in FINAL_STATUSES; update
the condition to use that enum/membership check so it matches the project's
existing pattern used in sync_results.py.
---
Nitpick comments:
In `@airbyte/cloud/connections.py`:
- Line 398: The hardcoded limit=5 on the get_previous_sync_logs call (variable
previous_jobs) can miss a recent succeeded job; make the scan depth configurable
or implement paging until a succeeded job is found: add a configurable parameter
(e.g., job_scan_depth or max_pages) to the surrounding function or class,
replace the fixed limit=5 when calling get_previous_sync_logs, or implement a
loop that calls get_previous_sync_logs with increasing offsets/pages until a
succeeded job is encountered or the configured max is reached; update callers
and any defaults to preserve current behavior if not provided.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 336c25f7-0044-49a8-a381-bc799d75605e
📒 Files selected for processing (2)
airbyte/cloud/connections.pyairbyte/mcp/cloud.py
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/mcp/cloud.py
|
Addressing the 4 nitpick suggestions from Review 6: 1. Try/except for graceful degradation in 2. Debug logging for single-value datetime heuristic (_sync_progress.py lines 131-136): 3. Try/except for pydantic parsing (_sync_progress.py lines 175-176): 4. Progress formula assumption documentation (_sync_progress.py lines 222-226): |
|
Addressing Review 13 nitpick — unit tests for heuristics and edge cases: Agree that Unit tests are a good follow-up item but not blocking for initial merge. The function has been validated end-to-end against real Airbyte API responses. |
|
Addressing Review 16: Duplicate comment (catalog traversal hardening, lines 102-106): Already addressed in commit 4a0913d — the catalog traversal now handles malformed entries gracefully. The Nitpick (unit tests for baseline-selection branches, lines 255-272): Same assessment as Review 13 — valuable follow-up but not blocking for initial merge. The baseline-selection logic (prev_cursor_dt vs sync_start_time fallback, denominator guard, negative numerator handling) has been validated via E2E testing against live connections with synthetic and real previous state data. |
|
Addressing Review 21: Nitpick (configurable job scan depth, line 398): The 2 actionable comments from this review (enum comparison and continue-scanning loop) were both fixed in commit c310209. |
- Bump JOB_WAIT_INTERVAL_SECS from 2.0 to 5.0 - Add with_rich_status_updates: bool | int = False param to SyncResult.wait_for_completion() and CloudConnection.run_sync() - Rich Live table shows per-stream progress (cursor, previous cursor, progress %, status/reason) with configurable polling interval - Min polling interval is 15s; values below are clamped with warning - True defaults to 15s poll frequency - ValueError if wait=False and with_rich_status_updates is truthy - When rich updates enabled, the rich interval is the sole poll cadence
- Snapshot dump_raw_state() in run_sync() before triggering sync - Store cached state on SyncResult._pre_sync_state for Rich polling - Add first-observed cursor tracking in _poll_until_complete_with_rich - Implement 3-tier fallback in compute_stream_progress: 1. Previous completed sync's cursor (from cached pre-sync state) 2. First-observed cursor during polling (for first-ever syncs) 3. sync_start_time (for real-time cursor sources) - Add _update_first_seen_cursors helper for polling loop
When the current cursor is behind the previous bookmark (e.g. GA4 re-processing from an earlier date), compute progress as: (cursor - first_seen) / (prev_bookmark - first_seen) This correctly shows advancing progress for connectors that start their cursor at a historical date and work forward toward the previous bookmark. Extract _compute_progress_pct helper to keep return count within ruff PLR0911 limit.
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _try_parse_datetime_cursor(value: str) -> datetime | None: |
There was a problem hiding this comment.
The CDK already has something that should do this. Something like ab_datetime_parse
There was a problem hiding this comment.
Fixed in commit afd150c. Now delegates to the CDK's ab_datetime_try_parse() from airbyte_cdk.utils.datetime_helpers.
Kept the numeric pre-filter because ab_datetime_try_parse('12345') interprets pure numbers as epoch timestamps (returns 1970-01-01T03:25:45+00:00), but cursor values like "12345" are opaque tokens we should skip.
airbyte/cloud/_sync_progress.py
Outdated
| If `cursor_field` is provided, looks for it directly. Otherwise, | ||
| attempts common cursor field names and heuristics. | ||
|
|
||
| Returns the cursor value as a string, or `None` if not found. |
There was a problem hiding this comment.
This approach appears to be throwing spaghetti at the wall. Unless we know each of these is a valid algorithm "in the wild" for a real source, we should probably remove our simplify.
There was a problem hiding this comment.
Agreed — removed all the heuristic fallbacks in commit afd150c. The function now requires an explicit cursor_field from the configured catalog. If cursor_field is None (not in the catalog), it returns None immediately — no more guessing with common_cursor_names or single-datetime-value heuristics.
airbyte/cloud/_sync_progress.py
Outdated
|
|
||
| Baseline selection uses a tiered fallback: | ||
|
|
||
| 1. Previous completed sync's cursor from ``previous_state_data``. |
There was a problem hiding this comment.
reST docstrings not permitted. Please create a new PR to port over the docstring validation process recently added there.
There was a problem hiding this comment.
Fixed in commit afd150c — converted all double backticks to single backticks (markdown format) throughout the file.
Regarding porting the docstring validation process: I'll create a separate follow-up PR for that since it's a broader repo-level concern beyond this feature PR.
airbyte/cloud/_sync_progress.py
Outdated
| if previous_state_data: | ||
| prev_cursor_map = _build_previous_cursor_map(previous_state_data, catalog_data) | ||
|
|
||
| try: |
There was a problem hiding this comment.
No blanket try permitted
There was a problem hiding this comment.
Fixed in commit afd150c. Replaced both blanket try/except blocks with upfront isinstance guard statements:
_build_previous_cursor_map: guards withif not isinstance(previous_state_data, dict): return resultcompute_stream_progress: guards withif not isinstance(state_data, dict): ... return []
Also removed the now-unused from pydantic import ValidationError import.
…statements, markdown docstrings 1. Replace custom datetime parsing with CDK's ab_datetime_try_parse 2. Remove spaghetti fallback heuristics from _find_cursor_value_in_state (now requires explicit cursor_field from catalog) 3. Replace blanket try/except with guard statements (isinstance checks) 4. Convert reST double-backtick docstrings to markdown single-backtick
…tation The Config API returns streams under syncCatalog.streams, but _extract_cursor_field_from_catalog was looking for catalog.streams at the top level. This caused cursor_field to always be None, preventing first_seen_cursors from populating and blocking the historical backfill progress path. Now handles both raw catalog and full connection response formats.
Summary
Adds two major features for monitoring Cloud sync progress:
1. Per-stream sync progress estimation (
with_stream_progress)Adds a
with_stream_progressboolean parameter to theget_cloud_sync_statusMCP tool. When enabled, it fetches the current connection state and configured catalog, then computes an estimated sync progress percentage for each stream that uses a datetime-based cursor.Core business logic lives in a new public module
airbyte/cloud/_sync_progress.py(keeping the MCP tool as a thin wrapper). The progress formula uses a tiered fallback for the baseline:previous_state_data) — most accuratesync_start_time(wall-clock job start) — works for real-time incremental, yieldsNonefor historical backfillsThe feature is opt-in (default
False) to avoid the latency cost of additional API calls when progress isn't needed.Companion PR: An equivalent
with_progressflag forget_connection_statein airbyte-ops-mcp (depends oncompute_stream_progressfrom this PR).2. Rich Live status display (
with_rich_status_updates)Adds
with_rich_status_updates: bool | int = Falsetowait_for_completion()andrun_sync(). When truthy, renders a Rich Live table to stderr showing per-stream sync progress in real time.Truefor 15-second polling (default), or anintfor a custom interval (minimum 15s, clamped with warning).JOB_WAIT_INTERVAL_SECSas the sole loop cadence.ValueErroris raised ifwait=Falseandwith_rich_status_updatesis truthy.Table columns:
Title bar shows job-level throughput metrics sourced from
JobResponse:Status: <status> | Elapsed: Xm Ys | Streams: X/Y reporting progressrows_synced/bytes_syncedare reported by the API:| 1,200 records, 235.3 KB, 3.7 records/sOther display details:
Start: YYYY-MM-DD HH:MM:SS UTC | Current: ... | Elapsed: ...SUCCEEDED, all streams with computed progress are overridden to 100% (the formula can show <100% for sources whose data doesn't extend to "today", e.g. GA4)FAILED/CANCELLEDkeep their actual progress valuesOther changes
JOB_WAIT_INTERVAL_SECSbumped from 2.0 → 5.0 (not publicly exposed)_pre_sync_statefield onSyncResultcaches the connection state before sync starts_format_bytes()helper formats byte counts as human-readable strings (B, KB, MB, GB) using SI (1000-based) unitsexamples/run_cloud_sync_with_rich_progress.pyfor observing the Rich display against the@devin-ai-sandboxworkspaceRecording of enhanced Rich table during real GA4 sync
View original video (rec-f96d41356a684e33bea7e78438f0eff0-edited.mp4)
Review & Testing Checklist for Human
_sync_progress.pyor_format_bytes(): The progress math (tiered fallback, historical backfill path, clamping) and byte formatting have no test coverage. This is the highest-risk area — consider whether the cursor parsing, baseline selection, and edge cases (negative numerator, zero denominator) warrant tests before merging.JobResponse.rows_synced/bytes_synced: These fields areOptional[int]and may remainNonefor the entire duration of some syncs (connector/destination dependent). The code guards with(job_info.rows_synced or 0) if job_info else 0— verify this gracefully degrades to showing no throughput rather than erroring.rows_synced / elapsed_secswhere elapsed starts from job creation, not from when records begin flowing. This means the displayed rate will be lower than the true ingestion rate, especially early in the sync. Verify this is acceptable for order-of-magnitude guidance.SUCCEEDED, the override only applies to streams whereprogress_pct is not None. Streams that couldn't compute progress (no cursor, non-datetime cursor) still showNone. Verify this is the desired behavior vs. showing 100% for all streams unconditionally.selectedfilter: The stream count denominator filters byconfig.selected == True. If no streams haveselected=True(unexpected API shape), it falls back to total stream count. Verify this fallback is safe and thatconfig.selectedis reliably present inweb_backend/connections/getresponses.examples/run_cloud_sync_with_rich_progress.pyhas workspace/connection IDs for@devin-ai-sandbox. Confirm these should be committed.Suggested test plan: Run
uv run python examples/run_cloud_sync_with_rich_progress.pyagainst a connection with 2-3 datetime-cursor streams. Observe: (1) denominator matches selected stream count from the start, (2) caption timestamps update each poll, (3) progress advances as streams checkpoint, (4) throughput metrics appear in the title bar once the API reportsrows_synced/bytes_synced, (5) final display shows 100% for all streams on SUCCEEDED with total record count and throughput in the title.Notes
_get_stream_list(private) from_connection_state.py— acceptable since both are internal modules underairbyte/cloud/.cursor <= prev_bookmark) uses a separate formula:(cursor - first_seen) / (prev_bookmark - first_seen)._format_bytes()uses SI units (1000-based: KB, MB, GB) not binary units (1024-based: KiB, MiB, GiB).Link to Devin session: https://app.devin.ai/sessions/8f7adf0cebe34d62a2c2d77860c79cb4
Requested by: Aaron ("AJ") Steers (@aaronsteers)