Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.google.cloud.teleport.v2.spanner.migrations.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraTypeHandler;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.source.cassandra.CassandraTypeHandler;

public class CassandraDao implements IDao<DMLGeneratorResponse> {
private final String cassandraUrl;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.processor;

import com.google.cloud.teleport.v2.spanner.migrations.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao;
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
import java.util.List;

/**
* Interface representing a connector to the external source/destination database (e.g., MySQL,
* PostgreSQL, Cassandra) involved in the Spanner migration.
*
* <p>An implementation of this interface encapsulates all dialect-specific configurations,
* connection management, metadata discovery, and validation logic required by the pipeline to
* integrate with the target database.
*/
public interface ISourceConnector {

/**
* Returns the dialect-specific DML Generator for this database.
*
* @return An implementation of {@link IDMLGenerator} to generate INSERT/UPDATE/DELETE queries.
*/
IDMLGenerator getDmlGenerator();

/**
* Returns the dialect-specific Connection Helper for this database.
*
* @return An implementation of {@link IConnectionHelper} to manage connection pools.
*/
IConnectionHelper getConnectionHelper();

/**
* Constructs and returns the JDBC or connection URL for a given shard.
*
* @param shard The shard configuration containing host, port, and database name.
* @return The connection URL string.
*/
String getConnectionUrl(Shard shard);

/**
* Returns a dialect-specific Data Access Object (DAO) for a given shard.
*
* @param shard The shard configuration to initialize the DAO with.
* @return An implementation of {@link IDao} for executing statements against the shard.
*/
IDao getDao(Shard shard);

/**
* Initializes the connection helper and connection pools for all shards.
*
* @param shards The list of all shards to establish connection pools for.
* @param maxConnections The maximum number of connections allowed in the pool.
*/
void initConnectionHelper(List<Shard> shards, int maxConnections);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,119 +15,42 @@
*/
package com.google.cloud.teleport.v2.templates.dbutils.processor;

import com.google.cloud.teleport.v2.spanner.migrations.connection.ConnectionHelperRequest;
import com.google.cloud.teleport.v2.spanner.migrations.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.spanner.migrations.connection.JdbcConnectionHelper;
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.templates.constants.Constants;
import com.google.cloud.teleport.v2.templates.dbutils.connection.CassandraConnectionHelper;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.CassandraDao;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.JdbcDao;
import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraDMLGenerator;
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
import com.google.cloud.teleport.v2.templates.dbutils.dml.MySQLDMLGenerator;
import com.google.cloud.teleport.v2.templates.dbutils.dml.PostgreSQLDMLGenerator;
import com.google.cloud.teleport.v2.templates.exceptions.UnsupportedSourceException;
import com.google.cloud.teleport.v2.templates.source.cassandra.CassandraSourceConnector;
import com.google.cloud.teleport.v2.templates.source.mysql.MySQLSourceConnector;
import com.google.cloud.teleport.v2.templates.source.postgres.PostgreSQLSourceConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;

public class SourceProcessorFactory {
private static Map<String, IDMLGenerator> dmlGeneratorMap = new HashMap<>();

private static Map<String, IConnectionHelper> connectionHelperMap = new HashMap<>();

private static final Map<String, String> driverMap =
Map.of(
Constants.SOURCE_MYSQL,
"com.mysql.cj.jdbc.Driver", // MySQL JDBC Driver
Constants.SOURCE_CASSANDRA,
"com.datastax.oss.driver.api.core.CqlSession", // Cassandra Session Class
Constants.SOURCE_POSTGRESQL,
"org.postgresql.Driver" // PostgreSQL JDBC Driver
);

private static Map<String, Function<Shard, String>> connectionUrl = new HashMap<>();
private static final Map<String, ISourceConnector> sourceMap = new HashMap<>();

static {
dmlGeneratorMap.put(Constants.SOURCE_MYSQL, new MySQLDMLGenerator());
dmlGeneratorMap.put(Constants.SOURCE_CASSANDRA, new CassandraDMLGenerator());
dmlGeneratorMap.put(Constants.SOURCE_POSTGRESQL, new PostgreSQLDMLGenerator());

connectionHelperMap.put(Constants.SOURCE_MYSQL, new JdbcConnectionHelper());
connectionHelperMap.put(Constants.SOURCE_CASSANDRA, new CassandraConnectionHelper());
connectionHelperMap.put(Constants.SOURCE_POSTGRESQL, new JdbcConnectionHelper());

connectionUrl.put(
Constants.SOURCE_MYSQL,
shard ->
"jdbc:mysql://" + shard.getHost() + ":" + shard.getPort() + "/" + shard.getDbName());
connectionUrl.put(
Constants.SOURCE_POSTGRESQL,
shard ->
"jdbc:postgresql://"
+ shard.getHost()
+ ":"
+ shard.getPort()
+ "/"
+ shard.getDbName());
connectionUrl.put(
Constants.SOURCE_CASSANDRA,
shard -> {
CassandraShard cassandraShard = (CassandraShard) shard;
return cassandraShard.getHost()
+ ":"
+ cassandraShard.getPort()
+ "/"
+ cassandraShard.getUserName()
+ "/"
+ cassandraShard.getKeySpaceName();
});
sourceMap.put(Constants.SOURCE_MYSQL, new MySQLSourceConnector());
sourceMap.put(Constants.SOURCE_POSTGRESQL, new PostgreSQLSourceConnector());
sourceMap.put(Constants.SOURCE_CASSANDRA, new CassandraSourceConnector());
}

private static Map<String, BiFunction<List<Shard>, Integer, ConnectionHelperRequest>>
connectionHelperRequestFactory =
Map.of(
Constants.SOURCE_MYSQL,
(shards, maxConnections) ->
new ConnectionHelperRequest(
shards,
null,
maxConnections,
driverMap.get(Constants.SOURCE_MYSQL),
"SET SESSION net_read_timeout=1200", // To avoid timeouts at the network layer
"jdbc:mysql://"),
Constants.SOURCE_POSTGRESQL,
(shards, maxConnections) ->
new ConnectionHelperRequest(
shards,
null,
maxConnections,
driverMap.get(Constants.SOURCE_POSTGRESQL),
null,
"jdbc:postgresql://"),
Constants.SOURCE_CASSANDRA,
(shards, maxConnections) ->
new ConnectionHelperRequest(
shards,
null,
maxConnections,
driverMap.get(Constants.SOURCE_CASSANDRA),
null, // No specific initialization query for Cassandra
null));
public static void registerSource(String sourceName, ISourceConnector source) {
sourceMap.put(sourceName, source);
}

// for unit testing purposes
public static void setConnectionHelperMap(Map<String, IConnectionHelper> connectionHelper) {
connectionHelperMap = connectionHelper;
public static Map<String, ISourceConnector> getSourceMap() {
return new HashMap<>(sourceMap);
}

static Map<String, IConnectionHelper> getConnectionHelperMap() {
return connectionHelperMap;
// for unit testing purposes
public static void setSourceMap(Map<String, ISourceConnector> map) {
sourceMap.clear();
sourceMap.putAll(map);
}

/**
Expand All @@ -141,68 +64,25 @@ static Map<String, IConnectionHelper> getConnectionHelperMap() {
*/
public static SourceProcessor createSourceProcessor(
String source, List<Shard> shards, int maxConnections) throws UnsupportedSourceException {
IDMLGenerator dmlGenerator = getDMLGenerator(source);
initializeConnectionHelper(source, shards, maxConnections);
Map<String, IDao> sourceDaoMap = createSourceDaoMap(source, shards);
ISourceConnector sourceInstance = getSource(source);

return SourceProcessor.builder().dmlGenerator(dmlGenerator).sourceDaoMap(sourceDaoMap).build();
}
IDMLGenerator dmlGenerator = sourceInstance.getDmlGenerator();
sourceInstance.initConnectionHelper(shards, maxConnections);
Map<String, IDao> sourceDaoMap = createSourceDaoMap(sourceInstance, shards);

private static IDMLGenerator getDMLGenerator(String source) throws UnsupportedSourceException {
return Optional.ofNullable(dmlGeneratorMap.get(source))
.orElseThrow(
() ->
new UnsupportedSourceException("Invalid source type for DML generator: " + source));
}

private static IConnectionHelper getConnectionHelper(String source)
throws UnsupportedSourceException {
return Optional.ofNullable(connectionHelperMap.get(source))
.orElseThrow(
() ->
new UnsupportedSourceException(
"Invalid source type for connection helper: " + source));
}

private static void initializeConnectionHelper(
String source, List<Shard> shards, int maxConnections) throws UnsupportedSourceException {
IConnectionHelper connectionHelper = getConnectionHelper(source);
if (!connectionHelper.isConnectionPoolInitialized()) {
ConnectionHelperRequest request =
createConnectionHelperRequest(source, shards, maxConnections);
connectionHelper.init(request);
}
return SourceProcessor.builder().dmlGenerator(dmlGenerator).sourceDaoMap(sourceDaoMap).build();
}

private static ConnectionHelperRequest createConnectionHelperRequest(
String source, List<Shard> shards, int maxConnections) throws UnsupportedSourceException {
return Optional.ofNullable(connectionHelperRequestFactory.get(source))
.map(factory -> factory.apply(shards, maxConnections))
.orElseThrow(
() ->
new UnsupportedSourceException(
"Invalid source type for ConnectionHelperRequest: " + source));
public static ISourceConnector getSource(String source) throws UnsupportedSourceException {
return Optional.ofNullable(sourceMap.get(source))
.orElseThrow(() -> new UnsupportedSourceException("Invalid source type: " + source));
}

private static Map<String, IDao> createSourceDaoMap(String source, List<Shard> shards)
throws UnsupportedSourceException {
Function<Shard, String> urlGenerator =
Optional.ofNullable(connectionUrl.get(source))
.orElseThrow(
() ->
new UnsupportedSourceException(
"Invalid source type for URL generation: " + source));

private static Map<String, IDao> createSourceDaoMap(
ISourceConnector sourceInstance, List<Shard> shards) {
Map<String, IDao> sourceDaoMap = new HashMap<>();
for (Shard shard : shards) {
String connectionUrl = urlGenerator.apply(shard);
IDao sqlDao;
if (source.equals(Constants.SOURCE_MYSQL) || source.equals(Constants.SOURCE_POSTGRESQL)) {
sqlDao = new JdbcDao(connectionUrl, shard.getUserName(), getConnectionHelper(source));
} else {
sqlDao = new CassandraDao(connectionUrl, shard.getUserName(), getConnectionHelper(source));
}
sourceDaoMap.put(shard.getLogicalShardId(), sqlDao);
sourceDaoMap.put(shard.getLogicalShardId(), sourceInstance.getDao(shard));
}
return sourceDaoMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.dml;
package com.google.cloud.teleport.v2.templates.source.cassandra;

import com.google.cloud.teleport.v2.spanner.ddl.Column;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
Expand All @@ -22,6 +22,7 @@
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceColumn;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable;
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
import com.google.cloud.teleport.v2.templates.exceptions.InvalidDMLGenerationException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.source.cassandra;

import com.google.cloud.teleport.v2.spanner.migrations.connection.ConnectionHelperRequest;
import com.google.cloud.teleport.v2.spanner.migrations.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.templates.dbutils.connection.CassandraConnectionHelper;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.CassandraDao;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao;
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
import com.google.cloud.teleport.v2.templates.dbutils.processor.ISourceConnector;
import java.util.List;

public class CassandraSourceConnector implements ISourceConnector {

private final IConnectionHelper connectionHelper = new CassandraConnectionHelper();

@Override
public IDMLGenerator getDmlGenerator() {
return new CassandraDMLGenerator();
}

@Override
public IConnectionHelper getConnectionHelper() {
return connectionHelper;
}

@Override
public String getConnectionUrl(Shard shard) {
CassandraShard cassandraShard = (CassandraShard) shard;
return cassandraShard.getHost()
+ ":"
+ cassandraShard.getPort()
+ "/"
+ cassandraShard.getUserName()
+ "/"
+ cassandraShard.getKeySpaceName();
}

@Override
public IDao getDao(Shard shard) {
return new CassandraDao(getConnectionUrl(shard), shard.getUserName(), getConnectionHelper());
}

@Override
public void initConnectionHelper(List<Shard> shards, int maxConnections) {
if (!connectionHelper.isConnectionPoolInitialized()) {
ConnectionHelperRequest request =
new ConnectionHelperRequest(
shards,
null,
maxConnections,
"com.datastax.oss.driver.api.core.CqlSession",
null,
null);
connectionHelper.init(request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.dml;
package com.google.cloud.teleport.v2.templates.source.cassandra;

import com.datastax.oss.driver.api.core.data.CqlDuration;
import com.google.cloud.teleport.v2.spanner.ddl.Column;
Expand Down
Loading
Loading