Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map
// Micro batch repartition size - when set to 0, we won't do the repartition
private val microBatchRepartition: Int = getProp("batch_repartition", "0").toInt

// Chunk size for fetchBaseJoin calls - when set to 0, we won't chunk
private val fetchChunkSize: Int = getProp("fetch_chunk_size", "0").toInt

private case class PutRequestHelper(inputSchema: StructType) extends Serializable {
@transient implicit lazy val logger = LoggerFactory.getLogger(getClass)
private val keyIndices: Array[Int] = keyColumns.map(inputSchema.fieldIndex)
Expand Down Expand Up @@ -435,10 +438,19 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map
logger.info(logMessage)
}

val responsesFuture = fetcher.fetchBaseJoin(requests, Option(joinSource.join))
// this might be potentially slower, but spark doesn't work when the internal derivation functionality triggers
// its own spark session, or when it passes around objects
val responses = Await.result(responsesFuture, 5.second)
val responses = if (fetchChunkSize > 0 && requests.length > fetchChunkSize) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's try to use the joinFetchParallelChunkSize from https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Fetcher.scala#L101C15-L101C41 so you don't have to implement the chunking here again.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pengyu-hou using joinFetchParallelChunkSize would require some changes in the Api.buildFetcher since currently the joinFetchParallelChunkSize is not passed in this function. Also the chaining is using fetchBaseJoin instead of fetchJoin.
The duplicate chunking is just 3 lines of code so I guess it might be fine to have some duplication here and avoid changing too many code? Lemme know what you think

implicit val ec = fetcher.executionContext
val chunks = requests.grouped(fetchChunkSize).toSeq
context.distribution("chain.fetch_chunk.count", chunks.size)
val chunkFutures = chunks.map(chunk => fetcher.fetchBaseJoin(chunk, Option(joinSource.join)))
val combinedFuture = Future.sequence(chunkFutures).map(_.flatten)
Await.result(combinedFuture, 5.second)
} else {
val responsesFuture = fetcher.fetchBaseJoin(requests, Option(joinSource.join))
Await.result(responsesFuture, 5.second)
}

// debug print payload for requests and responses
if (debug && shouldSample) {
Expand Down