Skip to content

Commit 604d785

Browse files
author
Komal Yadav
committed
Add SpannerMetadataModule with ExtensionLoader
Format Changes changes Changes name-changes formatting-changes elastic-changes changes Added copyright section Updated Updated Added Javadocs Corrected Copyright Corrected Copyright Lexicographical order correction Formatting correction Formatting correction changes Updated PR Updated names Updated comments Updated commit Formatting changes Formatting changes Formatting changes Formatting Changes Prefix Change Formatting Changes Formatting Changes Format changes
1 parent 60e58d4 commit 604d785

17 files changed

Lines changed: 540 additions & 3 deletions

File tree

cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2114,9 +2114,13 @@ public static final class Metadata {
21142114
public static final String STORAGE_PROVIDER_IMPLEMENTATION = "metadata.storage.implementation";
21152115
public static final String STORAGE_PROVIDER_NOSQL = "nosql";
21162116
public static final String STORAGE_PROVIDER_ELASTICSEARCH = "elastic";
2117+
public static final String STORAGE_PROVIDER_SPANNER = "gcp-spanner";
21172118

21182119
public static final String METADATA_WRITER_SUBSCRIBER = "metadata.writer";
21192120
public static final String METADATA_CONSUMER_WRITER_SUBSCRIBER = "metadata.consumer.writer";
2121+
2122+
// Metadata configs
2123+
public static final String METADATA_STORAGE_EXT_DIR = "metadata.storage.extensions.dir";
21202124
}
21212125

21222126
/**

cdap-common/src/main/resources/cdap-default.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2845,6 +2845,11 @@
28452845
<value>/opt/cdap/master/ext/log-publisher</value>
28462846
</property>
28472847

2848+
<property>
2849+
<name>metadata.storage.extensions.dir</name>
2850+
<value>/opt/cdap/master/ext/metadata-storage</value>
2851+
</property>
2852+
28482853
<!-- Metrics Configuration -->
28492854

28502855
<property>

cdap-data-fabric/src/main/java/io/cdap/cdap/data/runtime/DataSetsModules.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.cdap.cdap.data2.registry.UsageWriter;
4646
import io.cdap.cdap.metadata.elastic.ElasticsearchMetadataStorage;
4747
import io.cdap.cdap.security.impersonation.OwnerStore;
48+
import io.cdap.cdap.spi.metadata.DelegatingMetadataStorage;
4849
import io.cdap.cdap.spi.metadata.MetadataStorage;
4950
import io.cdap.cdap.spi.metadata.dataset.DatasetMetadataStorage;
5051
import io.cdap.cdap.spi.metadata.noop.NoopMetadataStorage;
@@ -178,11 +179,21 @@ public MetadataStorage get() {
178179
if (Constants.Metadata.STORAGE_PROVIDER_NOSQL.equalsIgnoreCase(config)) {
179180
return injector.getInstance(DatasetMetadataStorage.class);
180181
}
182+
183+
// TODO (CDAP-21173): Load elastic search implementation using DelegatingMetadataStorage
181184
if (Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH.equalsIgnoreCase(config)) {
182185
return injector.getInstance(ElasticsearchMetadataStorage.class);
183186
}
184-
throw new IllegalArgumentException("Unsupported MetadataStorage '" + config + "'. Only '"
185-
+ Constants.Metadata.STORAGE_PROVIDER_NOSQL + "' and '"
186-
+ Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH + "' are allowed.");
187+
if (Constants.Metadata.STORAGE_PROVIDER_SPANNER.equalsIgnoreCase(config)) {
188+
return injector.getInstance(DelegatingMetadataStorage.class);
189+
}
190+
String errorMessage = String.format(
191+
"Unsupported MetadataStorage '%s'. Only '%s', '%s' and '%s' are allowed.",
192+
config,
193+
Constants.Metadata.STORAGE_PROVIDER_NOSQL,
194+
Constants.Metadata.STORAGE_PROVIDER_SPANNER,
195+
Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH
196+
);
197+
throw new IllegalArgumentException(errorMessage);
187198
}
188199
}

cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/AuditMetadataStorage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ public void setAuditPublisher(AuditPublisher auditPublisher) {
8383
this.auditPublisher = auditPublisher;
8484
}
8585

86+
@Override
87+
public String getName() {
88+
return getClass().getSimpleName();
89+
}
90+
8691
@Override
8792
public void createIndex() throws IOException {
8893
try {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.spi.metadata;
18+
19+
import io.cdap.cdap.common.conf.CConfiguration;
20+
21+
import java.util.Collections;
22+
import java.util.Map;
23+
24+
// TODO(CDAP-21174): Create metadata storage specific properties
25+
26+
/**
27+
* Default implementation of the {@link MetadataStorageContext}.
28+
*/
29+
public class DefaultMetadataStorageContext implements MetadataStorageContext {
30+
31+
private final Map<String, String> properties;
32+
33+
DefaultMetadataStorageContext(CConfiguration cConf, String prefix) {
34+
this.properties = Collections.unmodifiableMap(cConf.getPropsWithPrefix(prefix));
35+
}
36+
37+
@Override
38+
public Map<String, String> getProperties() {
39+
return properties;
40+
}
41+
}
42+
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.spi.metadata;
18+
19+
import com.google.inject.Inject;
20+
import io.cdap.cdap.common.conf.CConfiguration;
21+
import io.cdap.cdap.common.conf.Constants;
22+
23+
import java.io.IOException;
24+
import java.util.List;
25+
26+
/**
27+
* Delegates {@link MetadataStorage} based on configured extension.
28+
*/
29+
public class DelegatingMetadataStorage implements MetadataStorage {
30+
private final CConfiguration cConf;
31+
private final MetadataStorage delegate;
32+
private static String prefix;
33+
34+
@Inject
35+
DelegatingMetadataStorage(CConfiguration cConf, MetadataStorageExtensionLoader extensionLoader) throws Exception {
36+
this.cConf = cConf;
37+
this.delegate = extensionLoader.get(getName());
38+
39+
if (this.delegate == null) {
40+
throw new IllegalArgumentException("Unsupported MetadataProvider type: " + getName());
41+
}
42+
// TODO(CDAP-21174): Create metadata storage specific properties
43+
if (getName().equals("gcp-spanner")) {
44+
this.prefix = String.format("%s%s.", Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX, getName());
45+
}
46+
else {
47+
this.prefix = null;
48+
}
49+
50+
this.delegate.initialize(new DefaultMetadataStorageContext(cConf, prefix));
51+
}
52+
53+
@Override
54+
public void createIndex() throws IOException {
55+
delegate.createIndex();
56+
}
57+
58+
@Override
59+
public void close() {
60+
if (delegate != null) {
61+
delegate.close();
62+
}
63+
}
64+
65+
@Override
66+
public String getName() {
67+
return cConf.get(Constants.Metadata.STORAGE_PROVIDER_IMPLEMENTATION);
68+
}
69+
70+
@Override
71+
public void dropIndex() throws IOException {
72+
delegate.dropIndex();
73+
}
74+
75+
@Override
76+
public MetadataChange apply(MetadataMutation mutation, MutationOptions options) throws IOException {
77+
return delegate.apply(mutation, options);
78+
}
79+
80+
@Override
81+
public List<MetadataChange> batch(List<? extends MetadataMutation> mutations, MutationOptions options) throws IOException {
82+
return delegate.batch(mutations, options);
83+
}
84+
85+
@Override
86+
public Metadata read(Read read) throws IOException {
87+
return delegate.read(read);
88+
}
89+
90+
@Override
91+
public SearchResponse search(SearchRequest request) throws IOException {
92+
return delegate.search(request);
93+
}
94+
}
95+
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.spi.metadata;
18+
19+
import com.google.inject.Inject;
20+
import io.cdap.cdap.common.conf.CConfiguration;
21+
import io.cdap.cdap.common.conf.Constants;
22+
import io.cdap.cdap.common.lang.ClassPathResources;
23+
import io.cdap.cdap.common.lang.FilterClassLoader;
24+
import io.cdap.cdap.extension.AbstractExtensionLoader;
25+
26+
import java.io.IOException;
27+
import java.util.Collections;
28+
import java.util.Set;
29+
30+
/**
31+
* Extension loader for {@link MetadataStorage} implementations.
32+
*/
33+
public class MetadataStorageExtensionLoader extends AbstractExtensionLoader<String, MetadataStorage> {
34+
35+
private static final Set<String> ALLOWED_RESOURCES = createAllowedResources();
36+
private static final Set<String> ALLOWED_PACKAGES = createPackageSets(ALLOWED_RESOURCES);
37+
38+
@Inject
39+
public MetadataStorageExtensionLoader(CConfiguration cConf) {
40+
super(cConf.get(Constants.Metadata.METADATA_STORAGE_EXT_DIR));
41+
}
42+
43+
private static Set<String> createAllowedResources() {
44+
try {
45+
return ClassPathResources.getResourcesWithDependencies(MetadataStorage.class.getClassLoader(),
46+
MetadataStorage.class);
47+
} catch (IOException e) {
48+
throw new RuntimeException("Failed to trace dependencies for MetadataStorage extension.", e);
49+
}
50+
}
51+
52+
@Override
53+
protected Set<String> getSupportedTypesForProvider(MetadataStorage metadataStorage) {
54+
return Collections.singleton(metadataStorage.getName());
55+
}
56+
57+
@Override
58+
protected FilterClassLoader.Filter getExtensionParentClassLoaderFilter() {
59+
return new FilterClassLoader.Filter() {
60+
@Override
61+
public boolean acceptResource(String resource) {
62+
return ALLOWED_RESOURCES.contains(resource);
63+
}
64+
65+
@Override
66+
public boolean acceptPackage(String packageName) {
67+
return ALLOWED_PACKAGES.contains(packageName);
68+
}
69+
};
70+
}
71+
}

cdap-data-fabric/src/main/java/io/cdap/cdap/spi/metadata/dataset/DatasetMetadataStorage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public class DatasetMetadataStorage extends SearchHelper implements MetadataStor
7474
super(txClient, tableDefinition);
7575
}
7676

77+
@Override
78+
public String getName() {
79+
return getClass().getSimpleName();
80+
}
81+
7782
@Override
7883
public void createIndex() throws IOException {
7984
createDatasets();

cdap-data-fabric/src/main/java/io/cdap/cdap/spi/metadata/noop/NoopMetadataStorage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
*/
3535
public class NoopMetadataStorage implements MetadataStorage {
3636

37+
@Override
38+
public String getName() {
39+
return getClass().getSimpleName();
40+
}
41+
3742
@Override
3843
public void createIndex() throws IOException {
3944
// no-op

cdap-elastic/src/main/java/io/cdap/cdap/metadata/elastic/ElasticsearchMetadataStorage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ public ElasticsearchMetadataStorage(CConfiguration cConf, SConfiguration sConf)
217217
RetryStrategies.fixDelay(retrySleepMs, TimeUnit.MILLISECONDS));
218218
}
219219

220+
@Override
221+
public String getName() {
222+
return getClass().getSimpleName();
223+
}
224+
220225
@Override
221226
public void close() {
222227
Closeables.closeQuietly(client);

0 commit comments

Comments
 (0)