Skip to content

[FLINK-39321][pipeline-connector-paimon]Fixes a bug where Flink CDC fails to write to Paimon dynamic bucket tables when dynamic-bucket.initial-buckets=1 and Flink parallelism > 1.#4344

Open
zhaoruifeng01 wants to merge 2 commits intoapache:masterfrom
zhaoruifeng01:FLINK-39321

Conversation

@zhaoruifeng01
Copy link
Copy Markdown

@zhaoruifeng01 zhaoruifeng01 commented Mar 26, 2026

This closes FLINK-39321

What is the purpose of the change

This pull request fixes a bug where Flink CDC fails to write to Paimon dynamic bucket tables when dynamic-bucket.initial-buckets=1 and Flink parallelism > 1. The issue is caused by a parameter mismatch between the routing calculation in PaimonHashFunction (via RowAssignerChannelComputer ) and the assigner validation in BucketAssignOperator (via HashBucketAssigner ). When numAssigners=1 , all data should route to assigner 0, but BucketAssignOperator creates HashBucketAssigner instances with different assignId values for each subtask, causing validation failures.

Brief change log

  • Modified BucketAssignOperator.java to set assignId=0 and numChannels=1 when minAssigners=1
  • This ensures that all subtasks have consistent assigner parameters when numAssigners=1
  • Aligns with the design principle that all data should route to assigner 0 when numAssigners=1

Verifying this change

Manually verified the change by:

  1. Creating a Paimon table with dynamic bucket mode ( bucket=-1 ) and dynamic-bucket.initial-buckets=1
  2. Configuring Flink CDC pipeline with parallelism=4
  3. Starting the Flink CDC job to write data to the Paimon table
  4. Verifying that data is written successfully without the IllegalArgumentException
  5. Verifying that all data routes to subtask 0 when numAssigners=1

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving) : no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicableThis closes FLINK-39321

What is the purpose of the change

This pull request fixes a bug where Flink CDC fails to write to Paimon dynamic bucket tables when dynamic-bucket.initial-buckets=1 and Flink parallelism > 1. The issue is caused by a parameter mismatch between the routing calculation in PaimonHashFunction (via RowAssignerChannelComputer ) and the assigner validation in BucketAssignOperator (via HashBucketAssigner ). When numAssigners=1 , all data should route to assigner 0, but BucketAssignOperator creates HashBucketAssigner instances with different assignId values for each subtask, causing validation failures.

Brief change log

  • Modified BucketAssignOperator.java to set assignId=0 and numChannels=1 when minAssigners=1
  • This ensures that all subtasks have consistent assigner parameters when numAssigners=1
  • Aligns with the design principle that all data should route to assigner 0 when numAssigners=1

Verifying this change

Manually verified the change by:

  1. Creating a Paimon table with dynamic bucket mode ( bucket=-1 ) and dynamic-bucket.initial-buckets=1
  2. Configuring Flink CDC pipeline with parallelism=4
  3. Starting the Flink CDC job to write data to the Paimon table
  4. Verifying that data is written successfully without the IllegalArgumentException
  5. Verifying that all data routes to subtask 0 when numAssigners=1

Does this pull request potentially affect one of the following parts

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving) : no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

…ables when dynamic-bucket.initial-buckets=1 and Flink parallelism > 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant