Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,10 @@
*/
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.POSTGRES_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.RUN_MODE_REGULAR;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.RUN_MODE_RETRY_ALL_DLQ;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.RUN_MODE_RETRY_DLQ;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
Expand All @@ -37,25 +31,22 @@
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.CassandraConnectionConfig;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConfigParser;
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConnectionConfig;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ISecretManagerAccessor;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.sourceddl.CassandraInformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.sourceddl.MySqlInformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.sourceddl.PostgreSQLInformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
import com.google.cloud.teleport.v2.templates.SpannerToSourceDb.Options;
import com.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord;
import com.google.cloud.teleport.v2.templates.constants.Constants;
import com.google.cloud.teleport.v2.templates.dbutils.processor.ISourceConnector;
import com.google.cloud.teleport.v2.templates.dbutils.processor.SourceProcessorFactory;
import com.google.cloud.teleport.v2.templates.transforms.AssignShardIdFn;
import com.google.cloud.teleport.v2.templates.transforms.ConvertChangeStreamErrorRecordToFailsafeElementFn;
import com.google.cloud.teleport.v2.templates.transforms.ConvertDlqRecordToTrimmedShardedDataChangeRecordFn;
Expand All @@ -68,12 +59,6 @@
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -124,14 +109,6 @@ public class SpannerToSourceDb {

private static final Logger LOG = LoggerFactory.getLogger(SpannerToSourceDb.class);

// JDBC Drivers
private static final String MYSQL_DRIVER = "com.mysql.cj.jdbc.Driver";
private static final String POSTGRESQL_DRIVER = "org.postgresql.Driver";

// JDBC URL Prefixes
private static final String MYSQL_JDBC_PREFIX = "jdbc:mysql://";
private static final String POSTGRESQL_JDBC_PREFIX = "jdbc:postgresql://";

/**
* Options supported by the pipeline.
*
Expand Down Expand Up @@ -653,18 +630,19 @@ public static PipelineResult run(Options options) {
.get(SpannerInformationSchemaProcessorTransform.SHADOW_TABLE_DDL_TAG)
.apply("View Shadow DDL", View.asSingleton());

ISourceConnector source = getSourceConnector(options);

List<Shard> shards = getShardList(options.getSourceType(), options.getSourceShardsFilePath());

// cassandra is always a single sharded migration.
// for JDBC, shards size and IsShardedMigration option is used below.
// for sources that support sharding, shards size and IsShardedMigration option are used below
// to further refine shardingMode.
// TODO Simplify this check
String shardingMode =
options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)
? Constants.SHARDING_MODE_SINGLE_SHARD
: Constants.SHARDING_MODE_MULTI_SHARD;
source.isShardingSupported()
? Constants.SHARDING_MODE_MULTI_SHARD
: Constants.SHARDING_MODE_SINGLE_SHARD;

if (MYSQL_SOURCE_TYPE.equals(options.getSourceType())) {
validateMySQLNotReadOnly(shards);
}
source.validateNotReadOnly(shards);

SourceSchema sourceSchema = fetchSourceSchema(options, shards);
LOG.info("Source schema: {}", sourceSchema);
Expand Down Expand Up @@ -696,6 +674,16 @@ public static PipelineResult run(Options options) {
return pipeline.run();
}

static ISourceConnector getSourceConnector(Options options) {
ISourceConnector source = null;
try {
return SourceProcessorFactory.getSource(options.getSourceType());
} catch (Exception e) {
LOG.warn("can not run for unsupported source: " + options.getSourceType(), e);
throw new RuntimeException("can not run for unsupported source:" + options.getSourceType());
}
}

static void buildPipeline(
Pipeline pipeline,
Options options,
Expand Down Expand Up @@ -1046,94 +1034,12 @@ static DeadLetterQueueManager buildDlqManager(Options options) {
return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetryCount(), true);
}

static Connection createJdbcConnection(
Shard shard, String driverClassName, String jdbcUrlPrefix) {
try {
String sourceConnectionUrl =
new StringBuilder()
.append(jdbcUrlPrefix)
.append(shard.getHost())
.append(":")
.append(shard.getPort())
.append("/")
.append(shard.getDbName())
.toString();
HikariConfig config = new HikariConfig();
config.setJdbcUrl(sourceConnectionUrl);
config.setUsername(shard.getUserName());
config.setPassword(shard.getPassword());
config.setDriverClassName(driverClassName);
HikariDataSource ds = new HikariDataSource(config);
return ds.getConnection();
} catch (java.sql.SQLException e) {
LOG.error("Sql error while discovering jdbc schema: {}", e);
throw new RuntimeException(e);
}
}

/**
* Creates a {@link CqlSession} for the given {@link CassandraShard}.
*
* @param cassandraShard The shard containing connection details.
* @return A {@link CqlSession} instance.
*/
static CqlSession createCqlSession(CassandraShard cassandraShard) {
CqlSessionBuilder builder = CqlSession.builder();
DriverConfigLoader configLoader =
CassandraDriverConfigLoader.fromOptionsMap(cassandraShard.getOptionsMap());
builder.withConfigLoader(configLoader);
return builder.build();
}

static void validateMySQLNotReadOnly(List<Shard> shards) {
for (Shard shard : shards) {
try (Connection conn = createJdbcConnection(shard, MYSQL_DRIVER, MYSQL_JDBC_PREFIX)) {
if (conn != null) {
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT @@read_only")) {
if (rs != null && rs.next() && rs.getInt(1) == 1) {
throw new RuntimeException(
"MySQL destination is in read-only mode for shard: " + shard.getLogicalShardId());
}
}
}
} catch (SQLException e) {
LOG.error(
"Error checking MySQL read-only status for shard {}: {}",
shard.getLogicalShardId(),
e.getMessage());
throw new RuntimeException("Error checking MySQL read-only status", e);
}
}
}

static SourceSchema fetchSourceSchema(Options options, List<Shard> shards) {
try {
return getSourceSchema(options, shards);
} catch (SQLException e) {
throw new RuntimeException("Unable to discover jdbc schema", e);
}
}

static SourceSchema getSourceSchema(Options options, List<Shard> shards) throws SQLException {
if (options.getSourceType().equals(MYSQL_SOURCE_TYPE)) {
try (Connection connection =
createJdbcConnection(shards.get(0), MYSQL_DRIVER, MYSQL_JDBC_PREFIX)) {
return new MySqlInformationSchemaScanner(connection, shards.get(0).getDbName()).scan();
}
} else if (options.getSourceType().equals(POSTGRES_SOURCE_TYPE)) {
try (Connection connection =
createJdbcConnection(shards.get(0), POSTGRESQL_DRIVER, POSTGRESQL_JDBC_PREFIX)) {
return new PostgreSQLInformationSchemaScanner(
connection, shards.get(0).getDbName(), shards.get(0).getNamespace())
.scan();
}
} else {
try (CqlSession session = createCqlSession((CassandraShard) shards.get(0))) {
return new CassandraInformationSchemaScanner(
session, ((CassandraShard) shards.get(0)).getKeySpaceName())
.scan();
}
ISourceConnector sourceInstance = SourceProcessorFactory.getSource(options.getSourceType());
return sourceInstance.getSourceSchema(shards.get(0));
} catch (Exception e) {
throw new RuntimeException("Unable to discover source schema", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.google.cloud.teleport.v2.spanner.migrations.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraTypeHandler;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.source.cassandra.CassandraTypeHandler;

public class CassandraDao implements IDao<DMLGeneratorResponse> {
private final String cassandraUrl;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.processor;

import com.google.cloud.teleport.v2.spanner.migrations.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao;
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
import java.util.List;

/**
* Interface representing a connector to the external source/destination database (e.g., MySQL,
* PostgreSQL, Cassandra) involved in the Spanner migration.
*
* <p>An implementation of this interface encapsulates all dialect-specific configurations,
* connection management, metadata discovery, and validation logic required by the pipeline to
* integrate with the target database.
*/
public interface ISourceConnector {

/**
* Returns the dialect-specific DML Generator for this database.
*
* @return An implementation of {@link IDMLGenerator} to generate INSERT/UPDATE/DELETE queries.
*/
IDMLGenerator getDmlGenerator();

/**
* Returns the dialect-specific Connection Helper for this database.
*
* @return An implementation of {@link IConnectionHelper} to manage connection pools.
*/
IConnectionHelper getConnectionHelper();

/**
* Constructs and returns the JDBC or connection URL for a given shard.
*
* @param shard The shard configuration containing host, port, and database name.
* @return The connection URL string.
*/
String getConnectionUrl(Shard shard);

/**
* Returns a dialect-specific Data Access Object (DAO) for a given shard.
*
* @param shard The shard configuration to initialize the DAO with.
* @return An implementation of {@link IDao} for executing statements against the shard.
*/
IDao getDao(Shard shard);

/**
* Initializes the connection helper and connection pools for all shards.
*
* @param shards The list of all shards to establish connection pools for.
* @param maxConnections The maximum number of connections allowed in the pool.
*/
void initConnectionHelper(List<Shard> shards, int maxConnections);

/**
* Scans the database schema for a given shard and returns its structured schema.
*
* @param shard The shard to scan.
* @return The discovered {@link SourceSchema}.
*/
SourceSchema getSourceSchema(Shard shard);

/**
* Validates that the target database shards are not in read-only mode, ensuring they are
* writeable by the replication pipeline.
*
* @param shards The list of shards to validate.
* @throws RuntimeException if any shard is determined to be read-only.
*/
void validateNotReadOnly(List<Shard> shards);

/**
* Returns whether this database type supports sharding.
*
* @return true if sharding is supported (e.g., MySQL, PostgreSQL), false otherwise (e.g.,
* Cassandra).
*/
boolean isShardingSupported();
}
Loading
Loading