Skip to content

Commit 0992246

Browse files
feat(Spanner): integrate SourceConfigParser to centralize shard configuration loading for SpannerToSourceDb pipelines (#3840)
1 parent c488c3a commit 0992246

13 files changed

Lines changed: 1780 additions & 207 deletions

File tree

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/utils/CassandraConfigFileReader.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,23 @@ public List<Shard> getCassandraShard(String cassandraConfigFilePath) {
4444
LOG.info("Reading Cassandra configuration from: {}", cassandraConfigFilePath);
4545
OptionsMap optionsMap =
4646
CassandraDriverConfigLoader.getOptionsMapFromFile(cassandraConfigFilePath);
47-
CassandraShard shard = new CassandraShard(optionsMap);
48-
LOG.info("Successfully created CassandraShard: {}", shard);
49-
return Collections.singletonList(shard);
47+
return getCassandraShard(optionsMap);
5048
} catch (FileNotFoundException e) {
5149
throw new IllegalArgumentException(
5250
"Configuration file not found: " + cassandraConfigFilePath, e);
5351
}
5452
}
53+
54+
/**
55+
* Reads the Cassandra configuration file from the specified GCS path and converts it into a list
56+
* of CassandraShard objects.
57+
*
58+
* @param cassandraConfigFilePath the GCS path of the Cassandra configuration file.
59+
* @return a list containing the parsed CassandraShard.
60+
*/
61+
public List<Shard> getCassandraShard(OptionsMap optionsMap) {
62+
CassandraShard shard = new CassandraShard(optionsMap);
63+
LOG.info("Successfully created CassandraShard: {}", shard);
64+
return Collections.singletonList(shard);
65+
}
5566
}

v2/spanner-to-sourcedb/README.md

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ A few prerequisites must be considered before starting with reverse replication.
8686
8. Ensure that that [session file](https://googlecloudplatform.github.io/spanner-migration-tool/reports.html#session-file-ending-in-sessionjson) is uploaded to GCS (this requires a schema conversion to be done).
8787
9. Configuration Files Upload
8888
- **For MySQL:**
89-
[Source shards file](./RunnigReverseReplication.md#sample-source-shards-file-for-MySQL) already uploaded to GCS.
89+
[Source shards file](#sample-source-shards-file-for-MySQL) already uploaded to GCS.
9090
- **For Cassandra:**
91-
[Source file](./RunnigReverseReplication.md#Sample-source-File-for-Cassandra) already uploaded to GCS.
91+
[Source file](#sample-source-file-for-Cassandra) already uploaded to GCS.
9292
10. Resources needed for reverse replication incur cost. Make sure to read [cost](#cost).
9393
11. Reverse replication uses shard identifier column per table to route the Spanner records to a given source shard.The column identified as the sharding column needs to be selected via Spanner Migration Tool when performing migration.The value of this column should be the logicalShardId value specified in the [source shard file](#sample-source-shards-file-for-MySQL).In the event that the shard identifier column is not an existing column,the application code needs to be changed to populate this shard identifier column when writing to Spanner. Or use a custom shard identifier plugin to supply the shard identifier. In case of single shard migrations, this step is skipped.
9494
12. The reverse replication pipeline uses GCS for dead letter queue handling. Ensure that the DLQ directory exists in GCS.
@@ -141,24 +141,26 @@ The database user password should be kept in [Secret Manager](#https://cloud.goo
141141
The file should be a list of JSONs as:
142142

143143
```json
144-
[
145-
{
146-
"logicalShardId": "shard1",
147-
"host": "10.11.12.13",
148-
"user": "root",
149-
"secretManagerUri":"projects/123/secrets/rev-cmek-cred-shard1/versions/latest",
150-
"port": "3306",
151-
"dbName": "db1"
152-
},
153-
{
154-
"logicalShardId": "shard2",
155-
"host": "10.11.12.14",
156-
"user": "root",
157-
"secretManagerUri":"projects/123/secrets/rev-cmek-cred-shard2/versions/latest",
158-
"port": "3306",
159-
"dbName": "db2"
160-
}
161-
]
144+
{
145+
"shardConfigs": [
146+
{
147+
"logicalShardId": "shard1",
148+
"host": "10.11.12.13",
149+
"user": "root",
150+
"secretManagerUri": "projects/123/secrets/rev-cmek-cred-shard1/versions/latest",
151+
"port": "3306",
152+
"dbName": "db1"
153+
},
154+
{
155+
"logicalShardId": "shard2",
156+
"host": "10.11.12.14",
157+
"user": "root",
158+
"secretManagerUri": "projects/123/secrets/rev-cmek-cred-shard2/versions/latest",
159+
"port": "3306",
160+
"dbName": "db2"
161+
}
162+
]
163+
}
162164
```
163165

164166

v2/spanner-to-sourcedb/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@
137137
<groupId>com.datastax.cassandra</groupId>
138138
<artifactId>cassandra-driver-core</artifactId>
139139
</exclusion>
140+
<exclusion>
141+
<groupId>com.datastax.oss</groupId>
142+
<artifactId>java-driver-core</artifactId>
143+
</exclusion>
140144
</exclusions>
141145
</dependency>
142146
<dependency>

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.teleport.v2.templates;
1717

18+
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
1819
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;
1920
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.POSTGRES_SOURCE_TYPE;
2021
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.RUN_MODE_REGULAR;
@@ -38,12 +39,16 @@
3839
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
3940
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
4041
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
42+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.CassandraConnectionConfig;
43+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.JdbcShardConfig;
44+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConfigParser;
45+
import com.google.cloud.teleport.v2.spanner.migrations.source.config.SourceConnectionConfig;
4146
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
4247
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;
4348
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
4449
import com.google.cloud.teleport.v2.spanner.migrations.utils.DataflowWorkerMachineTypeUtils;
50+
import com.google.cloud.teleport.v2.spanner.migrations.utils.ISecretManagerAccessor;
4551
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
46-
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
4752
import com.google.cloud.teleport.v2.spanner.sourceddl.CassandraInformationSchemaScanner;
4853
import com.google.cloud.teleport.v2.spanner.sourceddl.MySqlInformationSchemaScanner;
4954
import com.google.cloud.teleport.v2.spanner.sourceddl.PostgreSQLInformationSchemaScanner;
@@ -61,6 +66,7 @@
6166
import com.google.cloud.teleport.v2.templates.transforms.UpdateDlqMetricsFn;
6267
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
6368
import com.google.cloud.teleport.v2.values.FailsafeElement;
69+
import com.google.common.base.Preconditions;
6470
import com.google.common.base.Strings;
6571
import com.zaxxer.hikari.HikariConfig;
6672
import com.zaxxer.hikari.HikariDataSource;
@@ -647,20 +653,14 @@ public static PipelineResult run(Options options) {
647653
.get(SpannerInformationSchemaProcessorTransform.SHADOW_TABLE_DDL_TAG)
648654
.apply("View Shadow DDL", View.asSingleton());
649655

650-
List<Shard> shards;
651-
String shardingMode;
652-
if (MYSQL_SOURCE_TYPE.equals(options.getSourceType())
653-
|| POSTGRES_SOURCE_TYPE.equals(options.getSourceType())) {
654-
ShardFileReader shardFileReader = new ShardFileReader(new SecretManagerAccessorImpl());
655-
shards = shardFileReader.getOrderedShardDetails(options.getSourceShardsFilePath());
656-
shardingMode = Constants.SHARDING_MODE_MULTI_SHARD;
656+
List<Shard> shards = getShardList(options.getSourceType(), options.getSourceShardsFilePath());
657657

658-
} else {
659-
CassandraConfigFileReader cassandraConfigFileReader = new CassandraConfigFileReader();
660-
shards = cassandraConfigFileReader.getCassandraShard(options.getSourceShardsFilePath());
661-
LOG.info("Cassandra config is: {}", shards.get(0));
662-
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
663-
}
658+
// cassandra is always a single sharded migration.
659+
// for JDBC, shards size and IsShardedMigration option is used below.
660+
String shardingMode =
661+
options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)
662+
? Constants.SHARDING_MODE_SINGLE_SHARD
663+
: Constants.SHARDING_MODE_MULTI_SHARD;
664664

665665
if (MYSQL_SOURCE_TYPE.equals(options.getSourceType())) {
666666
validateMySQLNotReadOnly(shards);
@@ -951,6 +951,53 @@ static void buildPipeline(
951951
.build());
952952
}
953953

954+
/**
955+
* Returns a list of shards based on the source type and source shards file path. This should be
956+
* removed in Phase 2 of Standardizing config.
957+
*
958+
* @param sourceType The type of the source database.
959+
* @param sourceShardsFilePath The GCS path to the source shards configuration file.
960+
* @return A list of shards.
961+
*/
962+
public static List<Shard> getShardList(String sourceType, String sourceShardsFilePath) {
963+
ISecretManagerAccessor secretManagerAccessor = new SecretManagerAccessorImpl();
964+
SourceConfigParser sourceConfigParser = new SourceConfigParser(secretManagerAccessor);
965+
SourceConnectionConfig sourceConnectionConfig;
966+
try {
967+
// Parse the source shards configuration file to respective
968+
// SourceConnectionConfig.
969+
sourceConnectionConfig =
970+
sourceConfigParser.parseConfiguration(sourceType, sourceShardsFilePath);
971+
} catch (Exception e) {
972+
LOG.error("Error parsing source config", e);
973+
throw new RuntimeException("Error parsing source config", e);
974+
}
975+
List<Shard> shards;
976+
if (sourceConnectionConfig instanceof JdbcShardConfig) {
977+
shards = ((JdbcShardConfig) sourceConnectionConfig).getShardConfigs();
978+
LOG.info("JDBC shard config is parsed.");
979+
} else if (sourceConnectionConfig instanceof CassandraConnectionConfig) {
980+
CassandraConfigFileReader cassandraConfigFileReader = new CassandraConfigFileReader();
981+
shards =
982+
cassandraConfigFileReader.getCassandraShard(
983+
((CassandraConnectionConfig) sourceConnectionConfig).getOptionsMap());
984+
LOG.info("Cassandra shard config is parsed.");
985+
} else {
986+
String errorMessage =
987+
"Invalid source config for source type: "
988+
+ sourceType
989+
+ ". Source config parsed to: "
990+
+ sourceConnectionConfig.getClass()
991+
+ ". Source config file path: "
992+
+ sourceShardsFilePath;
993+
LOG.error(errorMessage);
994+
throw new RuntimeException(errorMessage);
995+
}
996+
Preconditions.checkArgument(
997+
shards != null && !shards.isEmpty(), "Shard list should have at least 1 element.");
998+
return shards;
999+
}
1000+
9541001
public static SpannerIO.ReadChangeStream getReadChangeStreamDoFn(
9551002
Options options, SpannerConfig spannerConfig) {
9561003

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDBShardedMySQLRetryAllDLQIT.java

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,8 @@
2222

2323
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
2424
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
25-
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
2625
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
2726
import com.google.common.io.Resources;
28-
import com.google.gson.Gson;
29-
import com.google.gson.JsonArray;
30-
import com.google.gson.JsonObject;
3127
import java.io.IOException;
3228
import java.time.Duration;
3329
import java.util.HashMap;
@@ -105,7 +101,11 @@ public void setUp() throws IOException, InterruptedException {
105101
gcsResourceManager = setUpSpannerITGcsResourceManager();
106102

107103
// Use generic multi-shard logic instead of base IT helper
108-
createAndUploadShardConfigToGcsMulti();
104+
createAndUploadShardConfigToGcs(
105+
gcsResourceManager,
106+
Map.of(
107+
"testShardA", jdbcResourceManagerShardA, "testShardB", jdbcResourceManagerShardB));
108+
;
109109

110110
// Upload overrides file
111111
gcsResourceManager.uploadArtifact(
@@ -394,35 +394,6 @@ private Integer getIntValueCaseInsensitive(Map<String, Object> map, String key)
394394
return null;
395395
}
396396

397-
private void createAndUploadShardConfigToGcsMulti() throws IOException {
398-
Shard shardA = new Shard();
399-
shardA.setLogicalShardId("testShardA");
400-
shardA.setUser(jdbcResourceManagerShardA.getUsername());
401-
shardA.setHost(jdbcResourceManagerShardA.getHost());
402-
shardA.setPassword(jdbcResourceManagerShardA.getPassword());
403-
shardA.setPort(String.valueOf(jdbcResourceManagerShardA.getPort()));
404-
shardA.setDbName(jdbcResourceManagerShardA.getDatabaseName());
405-
JsonObject jsObjA = (JsonObject) new Gson().toJsonTree(shardA).getAsJsonObject();
406-
jsObjA.remove("secretManagerUri"); // remove field secretManagerUri
407-
408-
Shard shardB = new Shard();
409-
shardB.setLogicalShardId("testShardB");
410-
shardB.setUser(jdbcResourceManagerShardB.getUsername());
411-
shardB.setHost(jdbcResourceManagerShardB.getHost());
412-
shardB.setPassword(jdbcResourceManagerShardB.getPassword());
413-
shardB.setPort(String.valueOf(jdbcResourceManagerShardB.getPort()));
414-
shardB.setDbName(jdbcResourceManagerShardB.getDatabaseName());
415-
JsonObject jsObjB = (JsonObject) new Gson().toJsonTree(shardB).getAsJsonObject();
416-
jsObjB.remove("secretManagerUri"); // remove field secretManagerUri
417-
418-
JsonArray ja = new JsonArray();
419-
ja.add(jsObjA);
420-
ja.add(jsObjB);
421-
String shardFileContents = ja.toString();
422-
LOG.info("Shard file contents: {}", shardFileContents);
423-
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
424-
}
425-
426397
private void insertDataInSpanner() {
427398
com.google.cloud.spanner.Mutation customer1 =
428399
com.google.cloud.spanner.Mutation.newInsertOrUpdateBuilder("Customers")

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDBShardedMySQLRetryDLQIT.java

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,8 @@
2222

2323
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
2424
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
25-
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
2625
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
2726
import com.google.common.io.Resources;
28-
import com.google.gson.Gson;
29-
import com.google.gson.JsonArray;
30-
import com.google.gson.JsonObject;
3127
import com.google.pubsub.v1.SubscriptionName;
3228
import java.io.IOException;
3329
import java.time.Duration;
@@ -108,7 +104,10 @@ public void setUp() throws IOException, InterruptedException {
108104
SpannerToSourceDBShardedMySQLRetryDLQIT.MYSQL_SCHEMA_FILE_RESOURCE);
109105

110106
gcsResourceManager = setUpSpannerITGcsResourceManager();
111-
createAndUploadShardConfigToGcsMulti();
107+
createAndUploadShardConfigToGcs(
108+
gcsResourceManager,
109+
Map.of(
110+
"testShardA", jdbcResourceManagerShardA, "testShardB", jdbcResourceManagerShardB));
112111

113112
// Upload session file
114113
gcsResourceManager.uploadArtifact(
@@ -374,35 +373,6 @@ private Integer getIntValueCaseInsensitive(Map<String, Object> map, String key)
374373
return null;
375374
}
376375

377-
private void createAndUploadShardConfigToGcsMulti() throws IOException {
378-
Shard shardA = new Shard();
379-
shardA.setLogicalShardId("testShardA");
380-
shardA.setUser(jdbcResourceManagerShardA.getUsername());
381-
shardA.setHost(jdbcResourceManagerShardA.getHost());
382-
shardA.setPassword(jdbcResourceManagerShardA.getPassword());
383-
shardA.setPort(String.valueOf(jdbcResourceManagerShardA.getPort()));
384-
shardA.setDbName(jdbcResourceManagerShardA.getDatabaseName());
385-
JsonObject jsObjA = (JsonObject) new Gson().toJsonTree(shardA).getAsJsonObject();
386-
jsObjA.remove("secretManagerUri");
387-
388-
Shard shardB = new Shard();
389-
shardB.setLogicalShardId("testShardB");
390-
shardB.setUser(jdbcResourceManagerShardB.getUsername());
391-
shardB.setHost(jdbcResourceManagerShardB.getHost());
392-
shardB.setPassword(jdbcResourceManagerShardB.getPassword());
393-
shardB.setPort(String.valueOf(jdbcResourceManagerShardB.getPort()));
394-
shardB.setDbName(jdbcResourceManagerShardB.getDatabaseName());
395-
JsonObject jsObjB = (JsonObject) new Gson().toJsonTree(shardB).getAsJsonObject();
396-
jsObjB.remove("secretManagerUri");
397-
398-
JsonArray ja = new JsonArray();
399-
ja.add(jsObjA);
400-
ja.add(jsObjB);
401-
String shardFileContents = ja.toString();
402-
LOG.info("Shard file contents: {}", shardFileContents);
403-
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
404-
}
405-
406376
private void insertDataInSpanner() {
407377
com.google.cloud.spanner.Mutation customer1 =
408378
com.google.cloud.spanner.Mutation.newInsertOrUpdateBuilder("Customers")

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomShardIT.java

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,7 @@
2323
import com.google.cloud.spanner.Mutation;
2424
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
2525
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
26-
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
2726
import com.google.common.io.Resources;
28-
import com.google.gson.Gson;
29-
import com.google.gson.JsonArray;
30-
import com.google.gson.JsonObject;
3127
import com.google.pubsub.v1.SubscriptionName;
3228
import java.io.IOException;
3329
import java.time.Duration;
@@ -103,7 +99,10 @@ public void setUp() throws IOException, InterruptedException {
10399
gcsResourceManager = setUpSpannerITGcsResourceManager();
104100
createAndUploadJarToGcs(gcsResourceManager);
105101

106-
createAndUploadShardConfigToGcs();
102+
createAndUploadShardConfigToGcs(
103+
gcsResourceManager,
104+
Map.of(
105+
"testShardA", jdbcResourceManagerShardA, "testShardB", jdbcResourceManagerShardB));
107106
gcsResourceManager.uploadArtifact(
108107
"input/session.json",
109108
Resources.getResource(SpannerToSourceDbCustomShardIT.SESSION_FILE_RESOURCE).getPath());
@@ -211,32 +210,4 @@ private void writeSpannerDataForSingers(int singerId, String firstName, String s
211210
.build();
212211
spannerResourceManager.write(m);
213212
}
214-
215-
private void createAndUploadShardConfigToGcs() throws IOException {
216-
Shard shard = new Shard();
217-
shard.setLogicalShardId("testShardA");
218-
shard.setUser(jdbcResourceManagerShardA.getUsername());
219-
shard.setHost(jdbcResourceManagerShardA.getHost());
220-
shard.setPassword(jdbcResourceManagerShardA.getPassword());
221-
shard.setPort(String.valueOf(jdbcResourceManagerShardA.getPort()));
222-
shard.setDbName(jdbcResourceManagerShardA.getDatabaseName());
223-
JsonObject jsObj = (JsonObject) new Gson().toJsonTree(shard).getAsJsonObject();
224-
jsObj.remove("secretManagerUri"); // remove field secretManagerUri
225-
226-
Shard shardB = new Shard();
227-
shardB.setLogicalShardId("testShardB");
228-
shardB.setUser(jdbcResourceManagerShardB.getUsername());
229-
shardB.setHost(jdbcResourceManagerShardB.getHost());
230-
shardB.setPassword(jdbcResourceManagerShardB.getPassword());
231-
shardB.setPort(String.valueOf(jdbcResourceManagerShardB.getPort()));
232-
shardB.setDbName(jdbcResourceManagerShardB.getDatabaseName());
233-
JsonObject jsObjB = (JsonObject) new Gson().toJsonTree(shardB).getAsJsonObject();
234-
jsObjB.remove("secretManagerUri"); // remove field secretManagerUri
235-
JsonArray ja = new JsonArray();
236-
ja.add(jsObj);
237-
ja.add(jsObjB);
238-
String shardFileContents = ja.toString();
239-
LOG.info("Shard file contents: {}", shardFileContents);
240-
gcsResourceManager.createArtifact("input/shard.json", shardFileContents);
241-
}
242213
}

0 commit comments

Comments
 (0)