Skip to content

Commit 99150c5

Browse files
authored
[Cloud_Spanner_to_GCS_Avro] Refactoring InformationSchemaScannerIT (#3915)
* Corrected and optimized InformationSchemaScannerIT * Formatting changes * Correcting SpannerResourceManagerTest * Formatting changes * Correcting build failure * Fixing a flaky test - MutationKeyEncoderTest * Addressing comments
1 parent aed1104 commit 99150c5

6 files changed

Lines changed: 1631 additions & 1347 deletions

File tree

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java

Lines changed: 136 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import static org.apache.beam.it.gcp.spanner.utils.SpannerResourceManagerUtils.generateInstanceId;
2424

2525
import com.google.auth.Credentials;
26+
import com.google.cloud.spanner.BatchClient;
2627
import com.google.cloud.spanner.Database;
2728
import com.google.cloud.spanner.DatabaseAdminClient;
2829
import com.google.cloud.spanner.DatabaseClient;
2930
import com.google.cloud.spanner.DatabaseId;
31+
import com.google.cloud.spanner.DatabaseInfo;
3032
import com.google.cloud.spanner.Dialect;
3133
import com.google.cloud.spanner.Instance;
3234
import com.google.cloud.spanner.InstanceAdminClient;
@@ -49,7 +51,11 @@
4951
import com.google.monitoring.v3.Aggregation.Aligner;
5052
import com.google.monitoring.v3.TimeInterval;
5153
import com.google.protobuf.Timestamp;
54+
import com.google.spanner.admin.instance.v1.CreateInstancePartitionRequest;
5255
import com.google.spanner.admin.instance.v1.Instance.Edition;
56+
import com.google.spanner.admin.instance.v1.InstanceConfigName;
57+
import com.google.spanner.admin.instance.v1.InstanceName;
58+
import com.google.spanner.admin.instance.v1.InstancePartition;
5359
import dev.failsafe.Failsafe;
5460
import dev.failsafe.RetryPolicy;
5561
import java.time.Duration;
@@ -100,6 +106,11 @@ public final class SpannerResourceManager implements ResourceManager {
100106

101107
private boolean hasInstance = false;
102108
private boolean hasDatabase = false;
109+
private final String instancePartitionId;
110+
private final String instancePartitionConfig;
111+
private final com.google.cloud.spanner.admin.instance.v1.InstanceAdminClient
112+
v1InstanceAdminClient;
113+
private boolean hasInstancePartition = false;
103114

104115
private final String projectId;
105116
private final String instanceId;
@@ -114,6 +125,7 @@ public final class SpannerResourceManager implements ResourceManager {
114125
private final InstanceAdminClient instanceAdminClient;
115126
private final DatabaseAdminClient databaseAdminClient;
116127
private final int nodeCount;
128+
private final byte[] protoDescriptors;
117129
private Timestamp startTime;
118130
private MonitoringClient monitoringClient;
119131
private final boolean suppressVerboseLogs;
@@ -164,7 +176,11 @@ private SpannerResourceManager(Builder builder) {
164176
this.instanceAdminClient = spanner.getInstanceAdminClient();
165177
this.databaseAdminClient = spanner.getDatabaseAdminClient();
166178
this.nodeCount = builder.nodeCount;
179+
this.protoDescriptors = builder.protoDescriptors;
167180
this.monitoringClient = builder.monitoringClient;
181+
this.instancePartitionId = builder.instancePartitionId;
182+
this.instancePartitionConfig = builder.instancePartitionConfig;
183+
this.v1InstanceAdminClient = spanner.createInstanceAdminClient();
168184
}
169185

170186
public static Builder builder(String testId, String projectId, String region) {
@@ -181,33 +197,62 @@ private synchronized void maybeCreateInstance() {
181197
if (usingStaticInstance) {
182198
LOG.info("Not creating Spanner instance - reusing static {}", instanceId);
183199
hasInstance = true;
184-
return;
185-
}
186-
187-
if (hasInstance) {
188-
return;
200+
} else if (!hasInstance) {
201+
LOG.info("Creating instance {} in project {}.", instanceId, projectId);
202+
try {
203+
InstanceInfo instanceInfo =
204+
InstanceInfo.newBuilder(InstanceId.of(projectId, instanceId))
205+
.setInstanceConfigId(InstanceConfigId.of(projectId, "regional-" + region))
206+
.setDisplayName(instanceId)
207+
.setEdition(Edition.ENTERPRISE_PLUS) // Needed by Full Text Search.
208+
.setNodeCount(nodeCount)
209+
.build();
210+
211+
// Retry creation if there's a quota error
212+
Instance instance =
213+
Failsafe.with(
214+
retryOnQuotaException(5, Duration.ofMinutes(1), Duration.ofMinutes(2), 0.5))
215+
.get(() -> instanceAdminClient.createInstance(instanceInfo).get());
216+
217+
hasInstance = true;
218+
LOG.info("Successfully created instance {}: {}.", instanceId, instance.getState());
219+
} catch (Exception e) {
220+
cleanupAll();
221+
throw new SpannerResourceManagerException("Failed to create instance.", e);
222+
}
189223
}
190224

191-
LOG.info("Creating instance {} in project {}.", instanceId, projectId);
192-
try {
193-
InstanceInfo instanceInfo =
194-
InstanceInfo.newBuilder(InstanceId.of(projectId, instanceId))
195-
.setInstanceConfigId(InstanceConfigId.of(projectId, "regional-" + region))
196-
.setDisplayName(instanceId)
197-
.setEdition(Edition.ENTERPRISE_PLUS) // Needed by Full Text Search.
198-
.setNodeCount(nodeCount)
199-
.build();
200-
201-
// Retry creation if there's a quota error
202-
Instance instance =
203-
Failsafe.with(retryOnQuotaException(5, Duration.ofMinutes(1), Duration.ofMinutes(2), 0.5))
204-
.get(() -> instanceAdminClient.createInstance(instanceInfo).get());
205-
206-
hasInstance = true;
207-
LOG.info("Successfully created instance {}: {}.", instanceId, instance.getState());
208-
} catch (Exception e) {
209-
cleanupAll();
210-
throw new SpannerResourceManagerException("Failed to create instance.", e);
225+
if (!hasInstancePartition && instancePartitionId != null && instancePartitionConfig != null) {
226+
LOG.info(
227+
"Creating instance partition {} with config {}",
228+
instancePartitionId,
229+
instancePartitionConfig);
230+
try {
231+
InstancePartition instancePartition =
232+
InstancePartition.newBuilder()
233+
.setDisplayName(instancePartitionId)
234+
.setNodeCount(1)
235+
.setConfig(InstanceConfigName.of(projectId, instancePartitionConfig).toString())
236+
.build();
237+
v1InstanceAdminClient
238+
.createInstancePartitionAsync(
239+
CreateInstancePartitionRequest.newBuilder()
240+
.setParent(InstanceName.of(projectId, instanceId).toString())
241+
.setInstancePartitionId(instancePartitionId)
242+
.setInstancePartition(instancePartition)
243+
.build())
244+
.get();
245+
hasInstancePartition = true;
246+
LOG.info("Successfully created instance partition {}.", instancePartitionId);
247+
} catch (Exception e) {
248+
if (e.getMessage() != null && e.getMessage().contains("ALREADY_EXISTS")) {
249+
LOG.info("Instance partition {} already exists.", instancePartitionId);
250+
hasInstancePartition = true;
251+
} else {
252+
cleanupAll();
253+
throw new SpannerResourceManagerException("Failed to create instance partition.", e);
254+
}
255+
}
211256
}
212257
}
213258

@@ -223,16 +268,18 @@ private synchronized void maybeCreateDatabase() {
223268
Database database =
224269
Failsafe.with(retryOnQuotaException())
225270
.get(
226-
() ->
227-
databaseAdminClient
228-
.createDatabase(
229-
databaseAdminClient
230-
.newDatabaseBuilder(
231-
DatabaseId.of(projectId, instanceId, databaseId))
232-
.setDialect(dialect)
233-
.build(),
234-
ImmutableList.of())
235-
.get());
271+
() -> {
272+
DatabaseInfo.Builder dbBuilder =
273+
databaseAdminClient
274+
.newDatabaseBuilder(DatabaseId.of(projectId, instanceId, databaseId))
275+
.setDialect(dialect);
276+
if (protoDescriptors != null) {
277+
dbBuilder.setProtoDescriptors(protoDescriptors);
278+
}
279+
return databaseAdminClient
280+
.createDatabase(dbBuilder.build(), ImmutableList.of())
281+
.get();
282+
});
236283

237284
hasDatabase = true;
238285
LOG.info("Successfully created database {}: {}.", databaseId, database.getState());
@@ -327,6 +374,10 @@ public synchronized void executeDdlStatements(List<String> statements)
327374
throws IllegalStateException {
328375
ensureUsableAndCreateResources();
329376

377+
if (statements.isEmpty()) {
378+
return;
379+
}
380+
330381
if (suppressVerboseLogs) {
331382
LOG.info("Executing {} DDL statements on database {}.", statements.size(), databaseId);
332383
} else {
@@ -336,11 +387,18 @@ public synchronized void executeDdlStatements(List<String> statements)
336387
// executeDdlStatments can fail for spanner staging because of failfast.
337388
Failsafe.with(retryOnQuotaException())
338389
.run(
339-
() ->
340-
databaseAdminClient
341-
.updateDatabaseDdl(
342-
instanceId, databaseId, statements, /* operationId= */ null)
343-
.get());
390+
() -> {
391+
DatabaseInfo.Builder dbBuilder =
392+
databaseAdminClient.newDatabaseBuilder(
393+
DatabaseId.of(projectId, instanceId, databaseId));
394+
if (protoDescriptors != null) {
395+
dbBuilder.setProtoDescriptors(protoDescriptors);
396+
}
397+
Database database = dbBuilder.build();
398+
databaseAdminClient
399+
.updateDatabaseDdl(database, statements, /* operationId= */ null)
400+
.get();
401+
});
344402
if (suppressVerboseLogs) {
345403
LOG.info(
346404
"Successfully executed {} DDL statements on database {}.",
@@ -372,6 +430,17 @@ public synchronized DatabaseClient getDatabaseClient() {
372430
return spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
373431
}
374432

433+
/**
434+
* Creates and returns Spanner Batch Client.
435+
*
436+
* @return Spanner Batch Client
437+
*/
438+
public synchronized BatchClient getBatchClient() {
439+
checkIsUsable();
440+
checkHasInstanceAndDatabase();
441+
return spanner.getBatchClient(DatabaseId.of(projectId, instanceId, databaseId));
442+
}
443+
375444
/**
376445
* Writes a given record into a table. This method requires {@link
377446
* SpannerResourceManager#executeDdlStatement(String)} to be called for the target table
@@ -583,7 +652,9 @@ public synchronized void cleanupAll() {
583652
Failsafe.with(retryOnQuotaException())
584653
.run(() -> databaseAdminClient.dropDatabase(instanceId, databaseId));
585654
}
655+
586656
} else {
657+
587658
LOG.info("Deleting instance {}...", instanceId);
588659

589660
if (instanceAdminClient != null) {
@@ -597,6 +668,10 @@ public synchronized void cleanupAll() {
597668
} catch (SpannerException e) {
598669
throw new SpannerResourceManagerException("Failed to delete instance.", e);
599670
} finally {
671+
if (v1InstanceAdminClient != null && !v1InstanceAdminClient.isShutdown()) {
672+
v1InstanceAdminClient.shutdown();
673+
}
674+
600675
if (!spanner.isClosed()) {
601676
spanner.close();
602677
}
@@ -663,8 +738,11 @@ public static final class Builder {
663738
private Credentials credentials;
664739
private String host;
665740
private int nodeCount;
741+
private byte[] protoDescriptors;
666742
private MonitoringClient monitoringClient;
667743
private boolean suppressVerboseLogs;
744+
private String instancePartitionId;
745+
private String instancePartitionConfig;
668746

669747
private Builder(String testId, String projectId, String region, Dialect dialect) {
670748
this.testId = testId;
@@ -677,6 +755,13 @@ private Builder(String testId, String projectId, String region, Dialect dialect)
677755
this.nodeCount = 1;
678756
}
679757

758+
public Builder setInstancePartition(
759+
String instancePartitionId, String instancePartitionConfig) {
760+
this.instancePartitionId = instancePartitionId;
761+
this.instancePartitionConfig = instancePartitionConfig;
762+
return this;
763+
}
764+
680765
public Builder setCredentials(Credentials credentials) {
681766
this.credentials = credentials;
682767
return this;
@@ -751,6 +836,17 @@ public Builder setNodeCount(int nodeCount) {
751836
return this;
752837
}
753838

839+
/**
840+
* Configures proto descriptors for database creation.
841+
*
842+
* @param protoDescriptors
843+
* @return
844+
*/
845+
public Builder setProtoDescriptors(byte[] protoDescriptors) {
846+
this.protoDescriptors = protoDescriptors;
847+
return this;
848+
}
849+
754850
/**
755851
* Sets Monitoring Client instance to be used for getMetrics method.
756852
*

it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public final class SpannerResourceManagerTest {
9494
@Captor private ArgumentCaptor<Iterable<String>> statementCaptor;
9595
@Captor private ArgumentCaptor<String> instanceIdCaptor;
9696
@Captor private ArgumentCaptor<String> databaseIdCaptor;
97+
@Captor private ArgumentCaptor<Database> databaseCaptor;
9798
@Captor private ArgumentCaptor<String> projectIdCaptor;
9899

99100
@Before
@@ -153,7 +154,10 @@ public void testExecuteDdlStatementShouldThrowExceptionWhenSpannerUpdateDatabase
153154
// arrange
154155
prepareCreateInstanceMock();
155156
prepareCreateDatabaseMock();
156-
when(spanner.getDatabaseAdminClient().updateDatabaseDdl(any(), any(), any(), any()).get())
157+
when(spanner
158+
.getDatabaseAdminClient()
159+
.updateDatabaseDdl(any(Database.class), any(), any())
160+
.get())
157161
.thenThrow(InterruptedException.class);
158162
String statement =
159163
"CREATE TABLE Singers (\n"
@@ -190,14 +194,10 @@ public void testExecuteDdlStatementShouldWorkWhenSpannerDoesntThrowAnyError()
190194
verify(spanner.getInstanceAdminClient(), times(2)).createInstance(any());
191195
verify(spanner.getDatabaseAdminClient(), times(2)).createDatabase(any(), any());
192196
verify(spanner.getDatabaseAdminClient(), times(2))
193-
.updateDatabaseDdl(
194-
instanceIdCaptor.capture(),
195-
databaseIdCaptor.capture(),
196-
statementCaptor.capture(),
197-
any());
198-
199-
String actualInstanceId = instanceIdCaptor.getValue();
200-
String actualDatabaseId = databaseIdCaptor.getValue();
197+
.updateDatabaseDdl(databaseCaptor.capture(), statementCaptor.capture(), any());
198+
199+
String actualInstanceId = testManager.getInstanceId();
200+
String actualDatabaseId = testManager.getDatabaseId();
201201
Iterable<String> actualStatement = statementCaptor.getValue();
202202

203203
assertThat(actualInstanceId).matches(TEST_ID + "-\\d{8}-\\d{6}-[a-zA-Z0-9]{6}");
@@ -222,7 +222,10 @@ public void testExecuteDdlStatementShouldRetryOnResourceExhaustedError()
222222
RuntimeException resourceExhaustedException =
223223
new RuntimeException(
224224
"com.google.cloud.spanner.SpannerException: RESOURCE_EXHAUSTED: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: CPU overload detected");
225-
when(spanner.getDatabaseAdminClient().updateDatabaseDdl(any(), any(), any(), any()).get())
225+
when(spanner
226+
.getDatabaseAdminClient()
227+
.updateDatabaseDdl(any(Database.class), any(), any())
228+
.get())
226229
.thenThrow(resourceExhaustedException)
227230
.thenReturn(null);
228231

@@ -234,14 +237,10 @@ public void testExecuteDdlStatementShouldRetryOnResourceExhaustedError()
234237
verify(spanner.getInstanceAdminClient(), times(2)).createInstance(any());
235238
verify(spanner.getDatabaseAdminClient(), times(2)).createDatabase(any(), any());
236239
verify(spanner.getDatabaseAdminClient(), times(3))
237-
.updateDatabaseDdl(
238-
instanceIdCaptor.capture(),
239-
databaseIdCaptor.capture(),
240-
statementCaptor.capture(),
241-
any());
242-
243-
String actualInstanceId = instanceIdCaptor.getValue();
244-
String actualDatabaseId = databaseIdCaptor.getValue();
240+
.updateDatabaseDdl(databaseCaptor.capture(), statementCaptor.capture(), any());
241+
242+
String actualInstanceId = testManager.getInstanceId();
243+
String actualDatabaseId = testManager.getDatabaseId();
245244
Iterable<String> actualStatement = statementCaptor.getValue();
246245

247246
assertThat(actualInstanceId).matches(TEST_ID + "-\\d{8}-\\d{6}-[a-zA-Z0-9]{6}");
@@ -826,7 +825,11 @@ private void prepareCreateDatabaseMock() throws ExecutionException, InterruptedE
826825

827826
private void prepareUpdateDatabaseMock() throws ExecutionException, InterruptedException {
828827
Mockito.lenient()
829-
.when(spanner.getDatabaseAdminClient().updateDatabaseDdl(any(), any(), any(), any()).get())
828+
.when(
829+
spanner
830+
.getDatabaseAdminClient()
831+
.updateDatabaseDdl(any(Database.class), any(), any())
832+
.get())
830833
.thenReturn(null);
831834
}
832835

v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,9 +1336,9 @@ private void listPropertyGraphTables(Ddl.Builder builder, String tableType) {
13361336
String tablesJson;
13371337
try {
13381338
tablesJson = resultSet.getJson(2);
1339-
} catch (Exception edgeTableException) {
1340-
LOG.debug(propertyGraphNameQualified + " does not contain any edge tables");
1341-
return;
1339+
} catch (Exception tableException) {
1340+
LOG.debug(propertyGraphNameQualified + " does not contain any {}", tableType);
1341+
continue;
13421342
}
13431343

13441344
LOG.debug("Schema PropertyGraph {}", propertyGraphNameQualified);

v1/src/main/java/com/google/cloud/teleport/spanner/spannerio/MutationKeyEncoder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public MutationKeyEncoder(SpannerSchema schema) {
5959
this.schema = schema;
6060
}
6161

62+
@VisibleForTesting
63+
static void clearUnknownTablesWarningsMap() {
64+
unknownTablesWarnings.clear();
65+
}
66+
6267
/**
6368
* Builds a lexicographically sortable binary key based on a primary key descriptor.
6469
*

0 commit comments

Comments
 (0)