Skip to content

[FLINK-39386] For Flink 2.x+, support limit the rate at the source#4362

Open
sd4324530 wants to merge 5 commits intoapache:masterfrom
sd4324530:FLINK-39386
Open

[FLINK-39386] For Flink 2.x+, support limit the rate at the source#4362
sd4324530 wants to merge 5 commits intoapache:masterfrom
sd4324530:FLINK-39386

Conversation

@sd4324530
Copy link
Copy Markdown
Contributor

@sd4324530 sd4324530 commented Apr 6, 2026

Currently FlinkCDC supports Flink 2.x+ and I think we can support this very useful feature.

see : https://cwiki.apache.org/confluence/display/FLINK/FLIP-535%3A+Introduce+RateLimiter+to+Source

Add connector option: records.per.second
It only works when running based on flink2.x.

boolean appendOnly =
config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);

double recordsPerSecond = config.get(MySqlSourceOptions.RECORDS_PER_SECOND);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] RECORDS_PER_SECOND is not registered in optionalOptions() for all 6 SQL TableSourceFactory implementations

All 6 SQL DynamicTableSourceFactory classes (MySqlTableSourceFactory, Db2TableSourceFactory, SqlServerTableSourceFactory, OracleTableSourceFactory, PostgreSQLTableSourceFactory, MongoDBTableSourceFactory) read the value via config.get(RECORDS_PER_SECOND) but never declare it in their optionalOptions() method. Flink's FactoryUtil validates that all user-provided options are declared — this means SQL/Table API users will get an "Unsupported options found" validation error when using records.per.second in CREATE TABLE ... WITH ('records.per.second' = '100').

The Pipeline DataSourceFactory classes correctly register it via options.add(RECORDS_PER_SECOND), so the feature works for Pipeline API but is broken for SQL/Table API.

Suggested fix: Add options.add(RECORDS_PER_SECOND); to optionalOptions() in each of the 6 factory classes.

— Qwen Code /review

incrementalSourceReaderContext.getSourceReaderContext(),
sourceConfig.getRecordsPerSecond() == -1
? null
: RateLimiterStrategy.perSecond(sourceConfig.getRecordsPerSecond()));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] Rate limiter applies to both snapshot AND incremental/binlog phases without distinction

The RateLimiterStrategy is passed unconditionally to the source reader, throttling ALL records regardless of split type. For CDC connectors this has a notable behavioral risk:

  • Snapshot phase: Rate limiting is typically desired to avoid overwhelming the source database.
  • Incremental/binlog phase: Rate limiting can cause the connector to fall behind the upstream change stream, risking binlog/WAL purging before the connector catches up (data loss for MySQL, replication slot issues for PostgreSQL).

The option description and documentation do not warn about this risk. Consider adding a documentation note warning users about the potential danger of rate limiting during the incremental phase.

— Qwen Code /review

.doubleType()
.defaultValue(-1d)
.withDescription(
"The maximum size of data processed per second, the default value: -1, not limited");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] Option description says "maximum size of data" but should say "maximum number of records"

The description says "The maximum size of data processed per second, the default value: -1, not limited" but the option is named records.per.second and controls record count, not byte size. This misleading description is replicated across all 6 *Options classes.

Suggested fix: Change to "The maximum number of records emitted per second. A negative value (default: -1) means no rate limiting."

— Qwen Code /review

Copy link
Copy Markdown

@wenshao wenshao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Request changes — see inline comments.

Summary of findings:

  1. [Critical] RECORDS_PER_SECOND not registered in optionalOptions() — SQL/Table API users will get validation errors. Affects all 6 SQL TableSourceFactory classes.
  2. [Suggestion] Rate limiter applies to both snapshot and incremental phases — consider warning users about the risk of falling behind the change stream during incremental reads.
  3. [Suggestion] Option description says "maximum size of data" but should say "maximum number of records" — misleading description replicated across all Option classes.

Reviewed by Qwen Code /review

@sd4324530
Copy link
Copy Markdown
Contributor Author

sd4324530 commented Apr 7, 2026

@wenshao
Thank you so much for your review. I have fixed it. Please take another look when you have time.

Signed-off-by: Pei Yu <125331682@qq.com>
Signed-off-by: Pei Yu <125331682@qq.com>
Signed-off-by: Pei Yu <125331682@qq.com>
Signed-off-by: Pei Yu <125331682@qq.com>
Signed-off-by: Pei Yu <125331682@qq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants