Skip to content

feat: Addressing bug in load_cdc_records, adding test suite#126

Open
ealexa05 wants to merge 7 commits intomainfrom
fix-cdc-update-functions
Open

feat: Addressing bug in load_cdc_records, adding test suite#126
ealexa05 wants to merge 7 commits intomainfrom
fix-cdc-update-functions

Conversation

@ealexa05
Copy link
Copy Markdown
Contributor

@ealexa05 ealexa05 commented Apr 6, 2026

Background

ods_fact.py has historically maintained 3 separate accumulators (insert_df, update_df, delete_df) for cdc operations that are incrementally built via cdc_to_fact(). This interleaved approach made it difficult to reason about operation conflicts, which, combined with a complete lack of unit tests, led to a bug where keys that were updated and subsequently deleted in an update batch were getting reinserted after the delete, which we saw impact the data in the unsettled transactions fact tables.

Solution

This PR does 2 things to address this and prevent similar errors from being reintroduced in the future:

  1. introduces a simplified structure for load_cdc_records() by accumulating all CDC records as raw data first, then resolving each key to its final operation (I/U/D) in one pass
  2. introduces a test suite (in collaboration with @MaxAlex)

Here is the order of operations for the new structure (per the new docstring):

Order of operations:
  1. Read the current fact table and its max header__change_seq.
  2. Pull CDC records (I/U/D) from history with seq > max_fact_seq.
     Repeat in a loop to accumulate enough work.
  3. For every key touched by CDC, determine the FINAL operation:
     sort all CDC records by header__change_seq descending, deduplicate
     by key keeping the latest. The latest oper wins.
  4. Build a single "new_rows" dataframe:
     - For keys whose final op is I: use the I record directly.
     - For keys whose final op is U: fetch the existing fact row, apply
       the sparse column updates, produce an updated row.  If no fact
       row exists (I→U in same batch), use the I record as the base.
     - For keys whose final op is D: no new row (just drop the old one).
  5. Collect odin_index values of all fact rows being replaced or deleted
     (any key in the CDC batch that already exists in fact_ds).
  6. Write: filter old dataset to exclude touched rows, union with new_rows,
     upload to S3.

Important to note: since updates are sparse (not every column need be updated at once), when the final operation is U, all updates are applied in sequence. If a key is inserted and updated within a single batch, the insert acts as a base for the subsequent updates. When I or D operations are last, only the last operation for this key need be applied.

Post-deploy operations

  • Audit tables to determine which were affected
  • Trigger a full snapshot reload for each affected table via migration
  • Update downstream users on status

@ealexa05 ealexa05 requested a review from MaxAlex April 6, 2026 21:53
how="diagonal",
).cast(orig_cast)

if existing_rows.height > 0:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code style opinion: some of these conditionals are best left out, for the sake of readability and consistency; e.g., here there isn't a special case where existing_rows is non-empty which needs particular handling, rather there's an operation that will always be done to the contents of existing_rows, and it's a no-op if existing_rows is empty. Letting the same code run on empty and non-empty code is simpler to think about and gives fewer opportunities for edge-case-specific bugs to come up.

I think similar arguments can be made for the conditionals on 397, 426, 437, 460 and 466, assuming the respective code behaves as expected on empty data. (429 gets a pass because it looks like that does an operation on cdc_df that would be expensive even if missing_keys is empty.)

In this case, also, if you take out this conditional (and the one on 397) you can also take out the separate definition of new_update_rows on 396.

cdc_seq_max=str(cdc_df.get_column("header__change_seq").max()),
all_cdc_frames: list[pl.DataFrame] = []
current_min_seq = max_fact_seq
max_load_records = 10_000
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code nit: Constants should be moved to the top of the file and given an all-caps name.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a MAX_LOAD_RECORDS constant (which max_load_records loads, as it is not a constant)

all_cdc_frames: list[pl.DataFrame] = []
current_min_seq = max_fact_seq
max_load_records = 10_000
for _ in range(11):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly; do we know why this is 11?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty arbitrary, but seemed to be working ok before

_, max_odin_index = ds_metadata_min_max(fact_ds, "odin_index")

# Log initial fact table state
init_log = ProcessLog(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a pre-existing issue, but while we're fixing this function we can also fix how ProcessLog is being mis-used here. Should have one ProcessLog for the function, with further add_metadata(), .complete() and .fail() as appropriate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I consolidated the existing log functions into one

# Load fact dataset and get current max sequence
s3_objects = list_objects(f"s3://{self.s3_export}/", in_filter=".parquet")
# --- Step 1: Load current fact table state ---
fact_ds = ds_from_path(f"s3://{self.s3_export}/")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should use the S3 directory path utility function from earlier.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this to use your S3 directory path utility function

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants