Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -70,6 +72,7 @@ public class IcebergResourceManager implements ResourceManager {
private final String catalogName;
private final Map<String, String> catalogProps;
private final Map<String, String> configProps;
private final List<String> createdNamespaces = Collections.synchronizedList(new ArrayList<>());

/**
* Creates a new IcebergResourceManager.
Expand Down Expand Up @@ -125,6 +128,7 @@ public boolean createNamespace(String namespace) {

try {
((SupportsNamespaces) catalog()).createNamespace(Namespace.of(components));
createdNamespaces.add(namespace);
return true;
} catch (AlreadyExistsException e) {
return false;
Expand Down Expand Up @@ -179,6 +183,8 @@ public boolean dropNamespace(String namespace, boolean cascade) {
catalog().listTables(ns).forEach(catalog()::dropTable);
}

createdNamespaces.remove(namespace);

// Drop the namespace
return ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(components));
}
Expand Down Expand Up @@ -213,7 +219,7 @@ public Table loadTable(String tableIdentifier) {
return catalog().loadTable(icebergIdentifier);
} catch (NoSuchTableException e) {
throw new IcebergResourceManagerException(
"No Such Table found with name" + tableIdentifier + "'.", e);
"No Such Table found with name '" + tableIdentifier + "'.", e);
}
}

Expand Down Expand Up @@ -286,10 +292,10 @@ public List<Record> read(String tableName) {
*/
@Override
public synchronized void cleanupAll() throws IcebergResourceManagerException {
Set<String> namespaces = listNamespaces();
for (String namespace : namespaces) {
for (String namespace : List.copyOf(createdNamespaces)) {
dropNamespace(namespace, true);
}
createdNamespaces.clear();
LOG.info("Cleaned up all resources for test ID: {}.", testId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.AppendFiles;
Expand Down Expand Up @@ -272,13 +271,11 @@ public void testWriteTableFails() throws IOException {

@Test
public void testCleanupAll() {
doReturn(List.of(Namespace.of(NAMESPACE_NAME)))
.when((SupportsNamespaces) catalog)
.listNamespaces();
doReturn(List.of(TABLE_IDENTIFIER)).when(catalog).listTables(any(Namespace.class));
doReturn(true).when((SupportsNamespaces) catalog).dropNamespace(any(Namespace.class));
doReturn(true).when((SupportsNamespaces) catalog).namespaceExists(any(Namespace.class));

testManager.createNamespace(NAMESPACE_NAME);
testManager.cleanupAll();

verify(catalog).listTables(Namespace.of(NAMESPACE_NAME));
Expand All @@ -287,12 +284,24 @@ public void testCleanupAll() {
}

@Test
public void testCleanupAllWhenNoNamespaces() {
doReturn(Collections.emptyList()).when((SupportsNamespaces) catalog).listNamespaces();
public void testCleanupAllOnlyDropsTrackedNamespaces() {
Namespace untracked = Namespace.of("untracked_ns");
doReturn(List.of(TABLE_IDENTIFIER)).when(catalog).listTables(any(Namespace.class));
doReturn(true).when((SupportsNamespaces) catalog).dropNamespace(any(Namespace.class));
doReturn(true).when((SupportsNamespaces) catalog).namespaceExists(any(Namespace.class));

testManager.createNamespace(NAMESPACE_NAME);
testManager.cleanupAll();

verify((SupportsNamespaces) catalog).dropNamespace(Namespace.of(NAMESPACE_NAME));
verify((SupportsNamespaces) catalog, never()).dropNamespace(untracked);
}

@Test
public void testCleanupAllWhenNoNamespaces() {
testManager.cleanupAll();

verify((SupportsNamespaces) catalog).listNamespaces();
verify((SupportsNamespaces) catalog, never()).listNamespaces();
verify(catalog, never()).listTables(any(Namespace.class));
verify((SupportsNamespaces) catalog, never()).dropNamespace(any(Namespace.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.PipelineOperator;
Expand Down Expand Up @@ -59,24 +60,30 @@ public class IcebergToMySQLYamlIT extends TemplateTestBase {

// Iceberg Setup
private static final String CATALOG_NAME = "hadoop_catalog";
private static final String NAMESPACE = "iceberg_namespace";
private final String namespace =
"iceberg_namespace_" + UUID.randomUUID().toString().replace("-", "");
private static final String ICEBERG_TABLE_NAME = "iceberg_table";
private static final String ICEBERG_TABLE_IDENTIFIER = NAMESPACE + "." + ICEBERG_TABLE_NAME;
private final String icebergTableIdentifier = namespace + "." + ICEBERG_TABLE_NAME;

@Before
public void setUp() throws IOException {
mySQLResourceManager = MySQLResourceManager.builder(testName).build();

warehouseGcsResourceManager =
GcsResourceManager.builder(getClass().getSimpleName(), credentials).build();
warehouseGcsResourceManager.registerTempDir(NAMESPACE);
artifactBucketName != null && !artifactBucketName.isEmpty()
? GcsResourceManager.builder(
artifactBucketName, getClass().getSimpleName(), credentials)
.build()
: GcsResourceManager.builder(getClass().getSimpleName(), credentials).build();
warehouseGcsResourceManager.registerTempDir(namespace);
LOG.info("warehouse bucket created {}", warehouseGcsResourceManager.getBucket());

icebergResourceManager =
IcebergResourceManager.builder(testName)
.setCatalogName(CATALOG_NAME)
.setCatalogProperties(getCatalogProperties())
.build();
icebergResourceManager.createNamespace(namespace);
}

@After
Expand All @@ -91,8 +98,8 @@ public void testIcebergToMySQL() throws IOException {
// Iceberg setup

// Create namespace in the REST catalog
icebergResourceManager.createNamespace(NAMESPACE);
LOG.info("Namespace '{}' created successfully", NAMESPACE);
icebergResourceManager.createNamespace(namespace);
LOG.info("Namespace '{}' created successfully", namespace);

// Define Iceberg table schema
Schema icebergSchema =
Expand All @@ -102,15 +109,15 @@ public void testIcebergToMySQL() throws IOException {
Types.NestedField.optional(3, "active", Types.IntegerType.get()));

// Create Iceberg table
icebergResourceManager.createTable(ICEBERG_TABLE_IDENTIFIER, icebergSchema);
icebergResourceManager.createTable(icebergTableIdentifier, icebergSchema);

List<Map<String, Object>> icebergRecords =
List.of(
Map.of("id", 1, "name", "Alice", "active", 1),
Map.of("id", 2, "name", "Bob", "active", 0),
Map.of("id", 3, "name", "Charlie", "active", 1));

icebergResourceManager.write(ICEBERG_TABLE_IDENTIFIER, icebergRecords);
icebergResourceManager.write(icebergTableIdentifier, icebergRecords);
LOG.info("Iceberg source table populated with {} records", icebergRecords.size());

// MySQL setup
Expand All @@ -125,7 +132,7 @@ public void testIcebergToMySQL() throws IOException {

LaunchConfig.Builder options =
LaunchConfig.builder(testName, specPath)
.addParameter("table", ICEBERG_TABLE_IDENTIFIER)
.addParameter("table", icebergTableIdentifier)
.addParameter("catalogName", CATALOG_NAME)
.addParameter(
"catalogProperties", new org.json.JSONObject(getCatalogProperties()).toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
Expand Down Expand Up @@ -57,9 +58,10 @@ public class IcebergToPostgreSQLYamlIT extends TemplateTestBase {

// Iceberg Setup
private static final String CATALOG_NAME = "hadoop_catalog";
private static final String NAMESPACE = "iceberg_namespace";
private final String namespace =
"iceberg_namespace_" + UUID.randomUUID().toString().replace("-", "");
private static final String ICEBERG_TABLE_NAME = "source_table";
private static final String ICEBERG_TABLE_IDENTIFIER = NAMESPACE + "." + ICEBERG_TABLE_NAME;
private final String icebergTableIdentifier = namespace + "." + ICEBERG_TABLE_NAME;

// Postgres Setup
private static final String POSTGRES_TABLE_NAME = "target_table";
Expand All @@ -75,8 +77,12 @@ public void setUp() throws IOException {

// Initialize GCS for Iceberg warehouse
warehouseGcsResourceManager =
GcsResourceManager.builder(getClass().getSimpleName(), credentials).build();
warehouseGcsResourceManager.registerTempDir(NAMESPACE);
artifactBucketName != null && !artifactBucketName.isEmpty()
? GcsResourceManager.builder(
artifactBucketName, getClass().getSimpleName(), credentials)
.build()
: GcsResourceManager.builder(getClass().getSimpleName(), credentials).build();
warehouseGcsResourceManager.registerTempDir(namespace);
LOG.info("Warehouse bucket created: {}", warehouseGcsResourceManager.getBucket());

// Initialize Iceberg resource manager
Expand All @@ -85,6 +91,7 @@ public void setUp() throws IOException {
.setCatalogName(CATALOG_NAME)
.setCatalogProperties(getCatalogProperties())
.build();
icebergResourceManager.createNamespace(namespace);
}

@After
Expand All @@ -98,8 +105,8 @@ public void testIcebergToPostgres() throws IOException {
// Iceberg setup

// Create namespace in the REST catalog
icebergResourceManager.createNamespace(NAMESPACE);
LOG.info("Namespace '{}' created successfully", NAMESPACE);
icebergResourceManager.createNamespace(namespace);
LOG.info("Namespace '{}' created successfully", namespace);

// Define Iceberg table schema
Schema icebergSchema =
Expand All @@ -109,15 +116,15 @@ public void testIcebergToPostgres() throws IOException {
Types.NestedField.optional(3, "active", Types.IntegerType.get()));

// Create Iceberg table
icebergResourceManager.createTable(ICEBERG_TABLE_IDENTIFIER, icebergSchema);
icebergResourceManager.createTable(icebergTableIdentifier, icebergSchema);

List<Map<String, Object>> icebergRecords =
List.of(
Map.of("id", 1, "name", "Alice", "active", 1),
Map.of("id", 2, "name", "Bob", "active", 0),
Map.of("id", 3, "name", "Charlie", "active", 1));

icebergResourceManager.write(ICEBERG_TABLE_IDENTIFIER, icebergRecords);
icebergResourceManager.write(icebergTableIdentifier, icebergRecords);
LOG.info("Iceberg source table populated with {} records", icebergRecords.size());

// Postgres setup
Expand All @@ -133,7 +140,7 @@ public void testIcebergToPostgres() throws IOException {
// Pipeline execution
LaunchConfig.Builder options =
LaunchConfig.builder(testName, specPath)
.addParameter("table", ICEBERG_TABLE_IDENTIFIER)
.addParameter("table", icebergTableIdentifier)
.addParameter("catalogName", CATALOG_NAME)
.addParameter(
"catalogProperties", new org.json.JSONObject(getCatalogProperties()).toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
Expand Down Expand Up @@ -57,9 +58,10 @@ public class IcebergToSQLServerYamlIT extends TemplateTestBase {

// Iceberg Setup
private static final String CATALOG_NAME = "hadoop_catalog";
private static final String NAMESPACE = "iceberg_namespace";
private final String namespace =
"iceberg_namespace_" + UUID.randomUUID().toString().replace("-", "");
private static final String ICEBERG_TABLE_NAME = "source_table";
private static final String ICEBERG_TABLE_IDENTIFIER = NAMESPACE + "." + ICEBERG_TABLE_NAME;
private final String icebergTableIdentifier = namespace + "." + ICEBERG_TABLE_NAME;

// SQL Server Setup
private static final String SQLSERVER_TABLE_NAME = "target_table";
Expand All @@ -75,8 +77,12 @@ public void setUp() throws IOException {

// Initialize GCS for Iceberg warehouse
warehouseGcsResourceManager =
GcsResourceManager.builder(getClass().getSimpleName(), credentials).build();
warehouseGcsResourceManager.registerTempDir(NAMESPACE);
artifactBucketName != null && !artifactBucketName.isEmpty()
? GcsResourceManager.builder(
artifactBucketName, getClass().getSimpleName(), credentials)
.build()
: GcsResourceManager.builder(getClass().getSimpleName(), credentials).build();
warehouseGcsResourceManager.registerTempDir(namespace);
LOG.info("Warehouse bucket created: {}", warehouseGcsResourceManager.getBucket());

// Initialize Iceberg resource manager
Expand All @@ -85,6 +91,7 @@ public void setUp() throws IOException {
.setCatalogName(CATALOG_NAME)
.setCatalogProperties(getCatalogProperties())
.build();
icebergResourceManager.createNamespace(namespace);
}

@After
Expand All @@ -98,8 +105,8 @@ public void testIcebergToSQLServer() throws IOException {
// Iceberg setup

// Create namespace in the REST catalog
icebergResourceManager.createNamespace(NAMESPACE);
LOG.info("Namespace '{}' created successfully", NAMESPACE);
icebergResourceManager.createNamespace(namespace);
LOG.info("Namespace '{}' created successfully", namespace);

// Define Iceberg table schema
Schema icebergSchema =
Expand All @@ -109,15 +116,15 @@ public void testIcebergToSQLServer() throws IOException {
Types.NestedField.optional(3, "active", Types.IntegerType.get()));

// Create Iceberg table
icebergResourceManager.createTable(ICEBERG_TABLE_IDENTIFIER, icebergSchema);
icebergResourceManager.createTable(icebergTableIdentifier, icebergSchema);

List<Map<String, Object>> icebergRecords =
List.of(
Map.of("id", 1, "name", "Alice", "active", 1),
Map.of("id", 2, "name", "Bob", "active", 0),
Map.of("id", 3, "name", "Charlie", "active", 1));

icebergResourceManager.write(ICEBERG_TABLE_IDENTIFIER, icebergRecords);
icebergResourceManager.write(icebergTableIdentifier, icebergRecords);
LOG.info("Iceberg source table populated with {} records", icebergRecords.size());

// SQL Server setup
Expand All @@ -133,7 +140,7 @@ public void testIcebergToSQLServer() throws IOException {
// Pipeline execution
LaunchConfig.Builder options =
LaunchConfig.builder(testName, specPath)
.addParameter("table", ICEBERG_TABLE_IDENTIFIER)
.addParameter("table", icebergTableIdentifier)
.addParameter("catalogName", CATALOG_NAME)
.addParameter(
"catalogProperties", new org.json.JSONObject(getCatalogProperties()).toString())
Expand All @@ -144,7 +151,7 @@ public void testIcebergToSQLServer() throws IOException {

// FOR INTEGRATION TESTS DEBUGGING PURPOSE: Logging the configuration parameters
LOG.info("=== Pipeline Parameters ===");
LOG.info("table: {}", ICEBERG_TABLE_IDENTIFIER);
LOG.info("table: {}", icebergTableIdentifier);
LOG.info("catalogName: {}", CATALOG_NAME);
LOG.info("catalogProperties: {}", new org.json.JSONObject(getCatalogProperties()).toString());
LOG.info("jdbcUrl: {}", mssqlResourceManager.getUri());
Expand Down
Loading
Loading