Skip to content

Support chunking for fetchBaseJoin in chaining streaming path#1090

Open
yuli-han wants to merge 2 commits intomainfrom
ylh--support-chunking-chaining
Open

Support chunking for fetchBaseJoin in chaining streaming path#1090
yuli-han wants to merge 2 commits intomainfrom
ylh--support-chunking-chaining

Conversation

@yuli-han
Copy link
Copy Markdown
Collaborator

@yuli-han yuli-han commented Feb 19, 2026

Summary

  • Add configurable chunking for fetchBaseJoin calls in JoinSourceRunner.enrichBaseJoin to prevent KV store overload or timeouts when micro-batches are large
  • When spark.chronon.stream.chain.fetch_chunk_size is set to a positive integer, requests are split into chunks and fetched in parallel as separate Futures
  • Default is 0 (no chunking), preserving current behavior

Why / Goal

Test Plan

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested

Checklist

  • Documentation update

Reviewers

@hzding621 @pengyu-hou @Shiyinghaha

// this might be potentially slower, but spark doesn't work when the internal derivation functionality triggers
// its own spark session, or when it passes around objects
val responses = Await.result(responsesFuture, 5.second)
val responses = if (fetchChunkSize > 0 && requests.length > fetchChunkSize) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's try to use the joinFetchParallelChunkSize from https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Fetcher.scala#L101C15-L101C41 so you don't have to implement the chunking here again.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pengyu-hou using joinFetchParallelChunkSize would require some changes in the Api.buildFetcher since currently the joinFetchParallelChunkSize is not passed in this function. Also the chaining is using fetchBaseJoin instead of fetchJoin.
The duplicate chunking is just 3 lines of code so I guess it might be fine to have some duplication here and avoid changing too many code? Lemme know what you think

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants