Skip to content

[FLINK-39252][pipeline-connector/sqlserver] Add SQL Server pipeline connector#4320

Open
Daishuyuan wants to merge 3 commits intoapache:masterfrom
Daishuyuan:FLINK-39252
Open

[FLINK-39252][pipeline-connector/sqlserver] Add SQL Server pipeline connector#4320
Daishuyuan wants to merge 3 commits intoapache:masterfrom
Daishuyuan:FLINK-39252

Conversation

@Daishuyuan
Copy link
Copy Markdown

@Daishuyuan Daishuyuan commented Mar 17, 2026

What is changed

  • add a new SQL Server pipeline connector module with source provider, event deserializer, metadata accessor, schema/type utilities and factory wiring
  • integrate the new module into the pipeline connectors build and register the connector factory service
  • add coverage for full types, metadata access, online schema migration, parallel snapshot, newly added tables and table pattern matching
  • add the source-connector side updates needed by the pipeline connector integration

Why

Flink CDC currently has a source SQL Server connector but does not provide the corresponding pipeline connector. This change adds SQL Server pipeline connector support so SQL Server can be used in pipeline jobs consistently with other databases.

@Daishuyuan
Copy link
Copy Markdown
Author

Daishuyuan commented Mar 19, 2026

Hi @lvyanquan, @loserwang1024, could you help review this PR when you have time?

I have addressed the previous comment and updated the SQL Server pipeline connector implementation and tests accordingly. Thanks.

@Daishuyuan
Copy link
Copy Markdown
Author

Hi @yuxiqian, @leonardBang, this PR adds the SQL Server pipeline connector and also includes several source-side SQL Server changes. Could you help take a look when you have time? Thanks.

@leonardBang
Copy link
Copy Markdown
Contributor

Hi @yuxiqian, @leonardBang, this PR adds the SQL Server pipeline connector and also includes several source-side SQL Server changes. Could you help take a look when you have time? Thanks.

@Daishuyuan Thanks for the contribution! But we're going to code freeze soon and have no enough committers to help review this PR, considering this feature was not included in 3.6 roadmap, maybe we'd like to delivery this feature in next version.

@Daishuyuan Daishuyuan requested a review from zml1206 March 26, 2026 06:21
@leonardBang leonardBang requested review from Copilot and removed request for zml1206 April 7, 2026 03:16
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds a new SQL Server pipeline connector to Flink CDC and extends SQL Server CDC source/pipeline integration to support additional behaviors like timestamp startup, metadata access, schema/type utilities, and broader integration test coverage.

Changes:

  • Added SQL Server pipeline connector module and registered its DataSourceFactory via service loader + build wiring.
  • Added "timestamp" startup mode support (table factory + pipeline datasource) and implemented timestamp→LSN offset creation for SQL Server.
  • Added extensive unit/integration tests for metadata access, full type coverage, schema migration, parallel snapshot behavior, newly added tables, and table pattern matching.

Reviewed changes

Copilot reviewed 35 out of 35 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java Adds table-factory tests for timestamp startup mode validation.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactoryTest.java Adds unit coverage for timestamp startup option propagation.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/table/SqlServerTableFactory.java Adds timestamp startup option parsing + option registration.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/offset/LsnFactory.java Implements timestamp→LSN mapping and logging for timestamp startup.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java Accepts TIMESTAMP and SNAPSHOT startup modes; wires scanNewlyAddedTableEnabled.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfig.java Plumbs scanNewlyAddedTableEnabled into base config.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceBuilder.java Passes source config into LsnFactory to enable timestamp offset creation.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java Adds handling for SQL Server error code 313 during CDC window queries.
flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml Adds new pipeline connector module to the build.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/resources/log4j2-test.properties Test logging configuration for the new module.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/resources/ddl/inventory.sql SQL Server test dataset for inventory scenarios.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/resources/ddl/column_type_test.sql SQL Server test dataset for full type coverage.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/testutils/SqlServerSourceTestUtils.java Adds shared test utilities for result collection + polling.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/testutils/RecordDataTestUtils.java Adds test helper for extracting RecordData fields with type-aware getters.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerTablePatternMatchingTest.java Adds tests for include/exclude table pattern matching.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerPipelineNewlyAddedTableITCase.java Adds IT coverage for scanning newly added tables across savepoint restore.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerPipelineITCaseTest.java Adds pipeline source IT cases for startup modes and metadata.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerParallelizedPipelineITCase.java Adds mini-cluster pipeline IT tests for parallel snapshot + streaming.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerOnlineSchemaMigrationITCase.java Adds IT coverage for online schema changes (add/alter/drop column).
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerMetadataAccessorITCase.java Adds IT coverage for namespaces/schemas/tables/schema retrieval.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerFullTypesITCase.java Adds IT coverage for reading and validating a broad SQL Server type set.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerEventDeserializerTest.java Adds unit tests for schema-change event deserialization.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/test/java/org/apache/flink/cdc/connectors/sqlserver/factory/SqlServerDataSourceFactoryTest.java Adds unit tests for factory options, pattern matching, and timestamp startup validation.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory Registers SQL Server pipeline factory via service loader.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/utils/SqlServerTypeUtils.java Adds Debezium column → Flink CDC type conversion utilities.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/utils/SqlServerSchemaUtils.java Adds schema/table discovery, schema conversion, SQL Server Agent validation.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/SqlServerPipelineRecordEmitter.java Emits CreateTableEvent(s) appropriately for pipeline mode + caches schemas.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSchemaDataTypeInference.java Adds SQL Server-specific schema inference type (currently default behavior).
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerPipelineSource.java Adds pipeline source variant using the pipeline record emitter.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerMetadataAccessor.java Adds MetadataAccessor for SQL Server pipeline connector.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerEventDeserializer.java Adds pipeline deserializer for data changes + schema changes + metadata.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerDataSourceOptions.java Adds pipeline connector configuration options.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerDataSource.java Adds SQL Server pipeline DataSource wiring deserializer/source/metadata accessor.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/factory/SqlServerDataSourceFactory.java Adds factory: validation, option parsing, table pattern resolution, startup modes.
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/pom.xml New module POM with runtime/test dependencies and shading config.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +59 to +61
return SqlServerSchemaUtils.listSchemas(sourceConfig, namespace);
}

Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listSchemas(@Nullable String namespace) forwards a nullable value into SqlServerSchemaUtils.listSchemas(...), which currently formats SQL using quote(namespace) and will NPE (or query [null]). Handle namespace == null explicitly (e.g., default to the configured database from sourceConfig.getDatabaseList() or throw a ValidationException with a clear message).

Suggested change
return SqlServerSchemaUtils.listSchemas(sourceConfig, namespace);
}
return SqlServerSchemaUtils.listSchemas(sourceConfig, resolveNamespace(namespace));
}
private String resolveNamespace(@Nullable String namespace) {
if (namespace != null) {
return namespace;
}
List<String> configuredDatabases = sourceConfig.getDatabaseList();
if (configuredDatabases != null && !configuredDatabases.isEmpty()) {
return configuredDatabases.get(0);
}
throw new IllegalArgumentException(
"Namespace must not be null when listing SQL Server schemas and no "
+ "database is configured in the source configuration.");
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines +333 to +348
private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T sideEvent) {
List<T> result = new ArrayList<>(size);
List<T> sideResults = new ArrayList<>();
while (size > 0 && iter.hasNext()) {
T event = iter.next();
if (sideEvent.getClass().isInstance(event)) {
sideResults.add(sideEvent);
} else {
result.add(event);
size--;
}
}
// Also ensure we've received at least one or many side events.
assertThat(sideResults).isNotEmpty();
return result;
}
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a side event is encountered, the code adds the sideEvent parameter rather than the actual event read from the iterator (sideResults.add(sideEvent)). This is logically incorrect and can hide issues if the test later inspects side events. Add the actual event instead.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
}
}
return null;
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the savepoint never succeeds after retries, the method returns null, and the test proceeds as if it had a savepoint path. This can lead to misleading failures (or restoring without a savepoint). Prefer throwing an exception (e.g., TimeoutException/AssertionError) once retries are exhausted so the test fails fast with a clear reason.

Suggested change
return null;
throw new AssertionError(
String.format(
"Failed to trigger savepoint in directory '%s' after %d retries.",
savepointDirectory, retryTimes));

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines +82 to +83
case Types.FLOAT:
return DataTypes.FLOAT();
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDBC Types.FLOAT typically represents a double-precision floating type (while Types.REAL is single-precision). Mapping both REAL and FLOAT to DataTypes.FLOAT() can lose precision for SQL Server FLOAT columns and contradicts the expected behavior in the added type IT coverage. Consider mapping Types.REAL -> DataTypes.FLOAT() and Types.FLOAT -> DataTypes.DOUBLE() (or use Debezium column metadata to decide).

Suggested change
case Types.FLOAT:
return DataTypes.FLOAT();
return DataTypes.FLOAT();
case Types.FLOAT:

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


Map<String, String> configMap = config.toMap();
mergeJdbcPropertiesIntoDebeziumProperties(configMap);
String databaseName = getValidateDatabaseName(tables).orElseThrow();
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues in database-name extraction/validation:\n1) If all entries are skipped (e.g., due to format issues), getValidateDatabaseName(...) can return Optional.empty(), and the caller uses .orElseThrow() without a message, resulting in a NoSuchElementException that doesn't explain the config problem.\n2) The length validation error message uses TABLES.key() (the literal key name) instead of the invalid value (e.g., trimmedTableName or the full tables string), making troubleshooting harder.\nRecommendation: treat any non-db.schema.table entry as a configuration error (fail fast with ValidationException/IllegalArgumentException containing the actual option value) and avoid returning empty when the option is required.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed "orElseThrow", and I process it by anothor way.

Comment on lines +393 to +397
checkState(
tableNameParts.length == 3,
String.format(
"Tables format must db.schema.table, can not 'tables' = %s",
TABLES.key()));
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues in database-name extraction/validation:\n1) If all entries are skipped (e.g., due to format issues), getValidateDatabaseName(...) can return Optional.empty(), and the caller uses .orElseThrow() without a message, resulting in a NoSuchElementException that doesn't explain the config problem.\n2) The length validation error message uses TABLES.key() (the literal key name) instead of the invalid value (e.g., trimmedTableName or the full tables string), making troubleshooting harder.\nRecommendation: treat any non-db.schema.table entry as a configuration error (fail fast with ValidationException/IllegalArgumentException containing the actual option value) and avoid returning empty when the option is required.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

.stringType()
.noDefaultValue()
.withDescription(
"Name of the SQL Server database to use when connecting to the SQL Server database server.");
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The username option description is incorrect: it describes a database name rather than the user name. Update the description to avoid confusing users configuring the connector.

Suggested change
"Name of the SQL Server database to use when connecting to the SQL Server database server.");
"Name of the SQL Server user to use when connecting to the SQL Server database server.");

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

TimeUnit.MILLISECONDS.sleep(interval.toMillis());
}
throw new TimeoutException(
"Ran out of time when waiting for " + description + " to success.");
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix grammar in the timeout message: replace 'to success' with 'to succeed'.

Suggested change
"Ran out of time when waiting for " + description + " to success.");
"Ran out of time when waiting for " + description + " to succeed.");

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines +32 to +35
// SQL Server doesn't have special geometry types like PostgreSQL,
// so we can use the default implementation from the parent class.
// If there are SQL Server specific types that need special handling,
// they can be added here by overriding the inferStruct method.
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is misleading: SQL Server does have spatial types (geometry, geography) and other special types (e.g., hierarchyid). If the intent is 'no special handling needed currently', consider rephrasing to avoid stating that these types don't exist.

Suggested change
// SQL Server doesn't have special geometry types like PostgreSQL,
// so we can use the default implementation from the parent class.
// If there are SQL Server specific types that need special handling,
// they can be added here by overriding the inferStruct method.
// SQL Server has database-specific types, but no special handling is currently
// needed here, so this class uses the default implementation from the parent class.
// If SQL Server-specific types require special handling in the future,
// it can be added here by overriding the inferStruct method.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

INSERT INTO products(name,description,weight)
VALUES ('rocks','box of assorted rocks',5.3);
INSERT INTO products(name,description,weight)
VALUES ('jacket','water resistent black wind breaker',0.1);
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in sample data: 'resistent' should be 'resistant'. If any assertions depend on the exact string, update those expected values accordingly.

Suggested change
VALUES ('jacket','water resistent black wind breaker',0.1);
VALUES ('jacket','water resistant black wind breaker',0.1);

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

int port = config.get(PORT);
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String chunkKeyColumn = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN is read from config at this line and passed to configFactory.chunkKeyColumn(), but it is NOT declared in optionalOptions(). This means FactoryHelper.validateExcept() will reject any user-provided value as an "unknown option," making the chunk key column configuration inaccessible to users.

Suggested fix: Add options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); to optionalOptions(), or remove the config read and .chunkKeyColumn() call if the option is intentionally unsupported.

— glm-5.1

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


Map<String, String> configMap = config.toMap();
mergeJdbcPropertiesIntoDebeziumProperties(configMap);
String databaseName = getValidateDatabaseName(tables).orElseThrow();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] getValidateDatabaseName(tables).orElseThrow() throws a bare NoSuchElementException with no contextual message when the tables config has no entries matching the db.schema.table format. This makes debugging difficult for users.

Suggested fix:

String databaseName =
    getValidateDatabaseName(tables)
        .orElseThrow(() -> new IllegalArgumentException(
            "Cannot determine database name from 'tables' option: " + tables
            + ". Expected format: database.schema.table"));

— glm-5.1

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");

public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND/LOWER_BOUND are defined with identical keys, defaults, and fallback keys as CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND/LOWER_BOUND. The factory reads values from CHUNK_KEY_* but error messages reference SPLIT_KEY_*, creating confusing inconsistency. Other pipeline connectors (e.g., Oracle) only have the CHUNK_KEY_* variants.

Suggested fix: Remove SPLIT_KEY_* variants and update error messages in validateDistributionFactorUpper/validateDistributionFactorLower to use CHUNK_KEY_*.key().

— glm-5.1

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

trimmedTableName.split(
"(?<!\\\\)\\.", -1); // Use -1 to avoid ignoring trailing empty elements

checkState(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] isValidSqlServerDbName uses regex [a-zA-Z_@#][a-zA-Z0-9_@#$]* which rejects valid SQL Server identifiers that use bracket-quoting with hyphens, spaces, or other special characters (e.g., [my-database]). Real-world databases can have such names.

Suggested fix: Either relax isValidSqlServerDbName to accept a wider set of characters matching SQL Server's actual naming rules, or remove this validation and rely on the downstream JDBC connection failing if the name is truly invalid.

— glm-5.1

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

"Tables format must db.schema.table, can not 'tables' = %s",
TABLES.key()));
String currentDbName = tableNameParts[0];

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] The error message "Tables format must db.schema.table, can not 'tables' = %s" is grammatically broken (missing "be" after "must") and the format argument is TABLES.key() (the option name "tables") rather than the actual malformed table identifier. This makes the error unhelpful for users diagnosing what went wrong.

Suggested fix:

checkState(
    tableNameParts.length == 3,
    String.format(
        "Table '%s' does not match the expected 'database.schema.table' format. "
            + "Please check the value of option '%s'.",
        trimmedTableName, TABLES.key()));

— glm-5.1

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

new EventTypeInfo())
.executeAndCollect();

Thread.sleep(5_000);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] Thread.sleep(5_000) is used as a synchronization barrier without any condition check. If the source hasn't finished the snapshot within 5 seconds, subsequent operations may read events out of order. Conversely, on fast machines this wastes time. The same pattern appears at line 128 and in SqlServerPipelineNewlyAddedTableITCase.java:327 with Thread.sleep(1000L).

Suggested fix: Replace with a polling loop (e.g., SqlServerSourceTestUtils.loopCheck) that waits until the expected condition is met, similar to the pattern already used in SqlServerParallelizedPipelineITCase.

— glm-5.1

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return namespaceNames;
}

public static String quote(String dbOrTableName) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] The quote() method wraps identifiers in [...] but does not escape embedded ] characters as ]]. The source connector module's SqlServerUtils.quote() correctly does "[" + name.replace("]", "]]") + "]". This inconsistency means database names containing ] would produce malformed SQL in the pipeline module's listSchemas() method.

Suggested fix:

return "[" + dbOrTableName.replace("]", "]]") + "]";

— glm-5.1

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Copy Markdown

@wenshao wenshao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: SQL Server Pipeline Connector

Reviewed the full diff (35 files, ~6000 lines). The implementation follows established patterns from other pipeline connectors (MySQL, PostgreSQL, Oracle) and is well-structured overall.

Summary

7 inline suggestions posted covering:

  • Missing optionalOptions() declaration for chunk key column config
  • Bare NoSuchElementException without contextual message
  • Duplicate config option constants (SPLIT_KEY_* vs CHUNK_KEY_*)
  • Overly strict database name validation regex
  • Grammatically broken error message
  • Thread.sleep without condition checks in tests (flaky risk)
  • quote() method missing ]]] escaping

Additionally, there is a potential issue worth a closer look: the error 313 handling in SqlServerStreamingChangeEventSource.java may not advance the LSN position, which could cause an infinite retry loop if the error is persistent (CDC data purged).

Reviewed by glm-5.1 via Qwen Code /review

Copy link
Copy Markdown

@wenshao wenshao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: SQL Server Pipeline Connector

Reviewed the full diff (35 files, ~6000 lines). The implementation follows established patterns from other pipeline connectors (MySQL, PostgreSQL, Oracle) and is well-structured overall.

Summary

7 inline suggestions posted covering:

  • Missing optionalOptions() declaration for chunk key column config
  • Bare NoSuchElementException without contextual message
  • Duplicate config option constants (SPLIT_KEY_* vs CHUNK_KEY_*)
  • Overly strict database name validation regex
  • Grammatically broken error message
  • Thread.sleep without condition checks in tests (flaky risk)
  • quote() method missing ]]] escaping

Additionally, there is a potential issue worth a closer look: the error 313 handling in SqlServerStreamingChangeEventSource.java may not advance the LSN position, which could cause an infinite retry loop if the error is persistent (CDC data purged).

Reviewed by glm-5.1 via Qwen Code /review

@leonardBang leonardBang self-requested a review April 7, 2026 04:15
…rce connectors

- handle null namespace in metadata accessor
- register chunk-key option and improve tables validation messages
- relax SQL Server database-name validation and escape bracket quoting
- fix FLOAT/REAL type mapping and clean up option/docs wording
- tighten tests for chunk-key support, malformed tables format, and quote escaping
- fail fast on SQL Server error 313 when CDC LSN window has been purged
…rce connectors

- handle null namespace in metadata accessor
- register chunk-key option and improve tables validation messages
- relax SQL Server database-name validation and escape bracket quoting
- fix FLOAT/REAL type mapping and clean up option/docs wording
- tighten tests for chunk-key support, malformed tables format, and quote escaping
- fail fast on SQL Server error 313 when CDC LSN window has been purged
@Daishuyuan
Copy link
Copy Markdown
Author

Thanks for the review. I addressed the comments in the latest commits.

Updated:

  • added null namespace fallback handling in the metadata accessor
  • registered scan.incremental.snapshot.chunk.key-column
  • improved tables validation and related error messages
  • relaxed database name validation for SQL Server
  • fixed chunk distribution factor error messages to use CHUNK_KEY_*
  • fixed bracket escaping in identifier quoting
  • corrected REAL / FLOAT type mapping
  • cleaned up a few test and wording issues raised in review
  • added regression tests for the above cases

I also updated the SQL Server error 313 handling in the source connector. Instead of retrying with the same stale LSN, it now fails fast with a clear message that the CDC range is no longer available and suggests re-snapshotting or advancing to the minimum available LSN.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants