[FLINK-39169][mysql-cdc-connector] feat: Add scan.snapshot.hostname option to route snapshot reads to a read replica#4357
[FLINK-39169][mysql-cdc-connector] feat: Add scan.snapshot.hostname option to route snapshot reads to a read replica#4357locchipinti wants to merge 17 commits intoapache:masterfrom
Conversation
This reverts commit d5b6704.
There was a problem hiding this comment.
Pull request overview
Adds support for routing MySQL snapshot data reads and metadata queries (discovery/chunk splitting) to a configurable read-replica host (scan.snapshot.hostname), while keeping binlog-related operations on the primary writer. This reduces writer load during snapshotting in replica-capable deployments (e.g., Aurora/RDS), with an explicit at-least-once tradeoff.
Changes:
- Introduces
snapshotHostnameplumbing across config/builder + pipeline connector optionscan.snapshot.hostname. - Splits snapshot execution to use separate primary vs snapshot connections (including pooled JDBC routing).
- Adds/updates tests and documentation for the new routing behavior and semantics.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java | Adds snapshotHostname to runtime config. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java | Adds factory setter and wires snapshot hostname into created config. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java | Exposes builder API for snapshotHostname. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java | Adds snapshot-specific JDBC/MySqlConnection creation/opening helpers. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java | Routes enumerator metadata discovery to snapshot JDBC connection. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java | Routes table discovery to snapshot JDBC connection. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java | Routes chunk splitting JDBC access to snapshot connection. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionFactory.java | Adds hostname override support for pooled JDBC connections. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java | Adds overload to build pools with a hostname override. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java | Supports building a pool/JDBC URL with a provided hostname. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/ConnectionPoolId.java | Adds getters used by new pool routing logic/tests. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java | Holds both primary and snapshot connections; closes safely. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java | Creates/propagates dual connections into snapshot task execution. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java | Uses primary for binlog watermarks and snapshot connection for SELECT scans. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java | New unit tests for snapshot hostname routing basics. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java | New tests for dual-connection handling and close() behavior. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtilsTest.java | Extends tests for snapshot connection creation and config plumbing. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml | Adds Mockito test dependency (but introduces duplication/version conflict). |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java | Adds pipeline option scan.snapshot.hostname (description needs spacing fixes). |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java | Wires the new option into MySqlSourceConfigFactory. |
| docs/content/docs/connectors/pipeline-connectors/mysql.md | Documents the new option and its at-least-once semantics caveat. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
Outdated
Show resolved
Hide resolved
...mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
Outdated
Show resolved
Hide resolved
.../test/java/org/apache/flink/cdc/connectors/mysql/source/connection/ReaderConnectionTest.java
Outdated
Show resolved
Hide resolved
...pache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTaskConnectionTest.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
Outdated
Show resolved
Hide resolved
- Remove duplicate mockito-core dependency (hardcoded 3.4.6) from pom.xml,
keep single entry using ${mockito.version} (3.12.4)
- Fix missing spaces in withDescription() string concatenations in
MySqlDataSourceOptions for SCAN_SNAPSHOT_HOSTNAME option
- Add getHostnameOverride() package-private getter to JdbcConnectionFactory
and strengthen testJdbcConnectionFactoryUsesHostnameOverride() to assert
the override hostname is correctly stored rather than just non-null
- Fix comment in MySqlSnapshotSplitReadTaskConnectionTest referencing
getReaderConnection() → getSnapshotConnection()
- Normalize empty string snapshotHostname to null in MySqlSourceConfigFactory
so downstream != null guards in DebeziumUtils work correctly
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...or-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
Show resolved
Hide resolved
| MySqlSourceConfig config = createConfig(PRIMARY_HOSTNAME, null); | ||
| BinaryLogClient binaryLogClient = mock(BinaryLogClient.class); | ||
|
|
||
| // When no reader hostname is configured, createReaderMySqlConnection falls back to writer |
There was a problem hiding this comment.
The comment refers to createReaderMySqlConnection, but the API being used is DebeziumUtils.createSnapshotMySqlConnection(...). Updating the wording will avoid confusion when maintaining these tests (especially since the production API is consistently named around “snapshot”).
| // When no reader hostname is configured, createReaderMySqlConnection falls back to writer | |
| // When no reader hostname is configured, createSnapshotMySqlConnection falls back to writer |
Motivation
In high-throughput environments backed by AWS Aurora / RDS (or any MySQL setup with read replicas), running snapshot scans against the primary writer instance increases its I/O load significantly.
This change introduces a
scan.snapshot.hostnameconfiguration option that routes all snapshot data reads and metadata queries (table discovery, chunk splitting) to a designated read replica, while keeping all binlog-based operations on the primary writer.Changes
New configuration option
scan.snapshot.hostname(MySqlSourceConfig, MySqlSourceConfigFactory, MySqlSourceBuilder, pipeline connector MySqlDataSourceOptions): optional hostname of a MySQL read replica used exclusively for snapshot reads and metadata queries. When not set, all operations fall back to the primary.Connection routing
Split-level dual-connection handling (MySqlSnapshotSplitReadTask, StatefulTaskContext)
Connection pool routing (JdbcConnectionFactory, JdbcConnectionPools, PooledDataSourceFactory)
Semantics
scan.snapshot.hostnameis set, exactly-once semantics cannot be guaranteed.Binlog watermark positions are recorded from the primary writer, but snapshot data is read from the replica.
Aurora/RDS storage replication lag (typically milliseconds) means these may not be perfectly consistent, potentially causing duplicate records during changelog replay.
This is an accepted trade-off for reduced primary load, as discussed with @leonardBang
Tests
Documentation
scan.snapshot.hostnamerow.Read Replica Snapshotexplaining the feature, at-least-once caveat, and an AWS Aurora/RDS example.