Skip to content

Commit 2bf40f3

Browse files
author
Komal Yadav
committed
Add SpannerMetadataModule with ExtensionLoader
Format Changes changes Changes name-changes formatting-changes elastic-changes changes Added copyright section Updated Updated Added Javadocs Corrected Copyright Corrected Copyright Lexicographical order correction Formatting correction Formatting correction changes
1 parent 60e58d4 commit 2bf40f3

25 files changed

Lines changed: 688 additions & 26 deletions

File tree

cdap-app-fabric/src/main/java/io/cdap/cdap/metadata/MetadataHttpHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import io.cdap.cdap.common.security.AuditDetail;
3232
import io.cdap.cdap.common.security.AuditPolicy;
3333
import io.cdap.cdap.data2.metadata.MetadataCompatibility;
34-
import io.cdap.cdap.metadata.elastic.ScopedNameOfKindTypeAdapter;
35-
import io.cdap.cdap.metadata.elastic.ScopedNameTypeAdapter;
3634
import io.cdap.cdap.proto.EntityScope;
3735
import io.cdap.cdap.proto.ProgramType;
3836
import io.cdap.cdap.proto.codec.NamespacedEntityIdCodec;
@@ -52,6 +50,8 @@
5250
import io.cdap.cdap.spi.metadata.MutationOptions;
5351
import io.cdap.cdap.spi.metadata.ScopedName;
5452
import io.cdap.cdap.spi.metadata.ScopedNameOfKind;
53+
import io.cdap.cdap.spi.metadata.ScopedNameOfKindTypeAdapter;
54+
import io.cdap.cdap.spi.metadata.ScopedNameTypeAdapter;
5555
import io.cdap.cdap.spi.metadata.SearchRequest;
5656
import io.cdap.cdap.spi.metadata.SearchResponse;
5757
import io.cdap.cdap.spi.metadata.Sorting;

cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2114,9 +2114,13 @@ public static final class Metadata {
21142114
public static final String STORAGE_PROVIDER_IMPLEMENTATION = "metadata.storage.implementation";
21152115
public static final String STORAGE_PROVIDER_NOSQL = "nosql";
21162116
public static final String STORAGE_PROVIDER_ELASTICSEARCH = "elastic";
2117+
public static final String STORAGE_PROVIDER_SPANNER = "spanner";
21172118

21182119
public static final String METADATA_WRITER_SUBSCRIBER = "metadata.writer";
21192120
public static final String METADATA_CONSUMER_WRITER_SUBSCRIBER = "metadata.consumer.writer";
2121+
2122+
// Metadata configs
2123+
public static final String METADATA_STORAGE_EXT_DIR = "metadata.storage.extensions.dir";
21202124
}
21212125

21222126
/**

cdap-common/src/main/resources/cdap-default.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2845,6 +2845,11 @@
28452845
<value>/opt/cdap/master/ext/log-publisher</value>
28462846
</property>
28472847

2848+
<property>
2849+
<name>metadata.storage.extensions.dir</name>
2850+
<value>/opt/cdap/master/ext/metadata-storage</value>
2851+
</property>
2852+
28482853
<!-- Metrics Configuration -->
28492854

28502855
<property>

cdap-data-fabric/src/main/java/io/cdap/cdap/data/runtime/DataSetsModules.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.cdap.cdap.data2.registry.UsageWriter;
4646
import io.cdap.cdap.metadata.elastic.ElasticsearchMetadataStorage;
4747
import io.cdap.cdap.security.impersonation.OwnerStore;
48+
import io.cdap.cdap.spi.metadata.DelegatingMetadataStorage;
4849
import io.cdap.cdap.spi.metadata.MetadataStorage;
4950
import io.cdap.cdap.spi.metadata.dataset.DatasetMetadataStorage;
5051
import io.cdap.cdap.spi.metadata.noop.NoopMetadataStorage;
@@ -181,8 +182,12 @@ public MetadataStorage get() {
181182
if (Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH.equalsIgnoreCase(config)) {
182183
return injector.getInstance(ElasticsearchMetadataStorage.class);
183184
}
185+
if (Constants.Metadata.STORAGE_PROVIDER_SPANNER.equalsIgnoreCase(config)) {
186+
return injector.getInstance(DelegatingMetadataStorage.class);
187+
}
184188
throw new IllegalArgumentException("Unsupported MetadataStorage '" + config + "'. Only '"
185-
+ Constants.Metadata.STORAGE_PROVIDER_NOSQL + "' and '"
189+
+ Constants.Metadata.STORAGE_PROVIDER_NOSQL + "','"
190+
+ Constants.Metadata.STORAGE_PROVIDER_SPANNER + "' and '"
186191
+ Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH + "' are allowed.");
187192
}
188193
}

cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/AuditMetadataStorage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ public void setAuditPublisher(AuditPublisher auditPublisher) {
8383
this.auditPublisher = auditPublisher;
8484
}
8585

86+
@Override
87+
public String getName() {
88+
return getClass().getSimpleName();
89+
}
90+
8691
@Override
8792
public void createIndex() throws IOException {
8893
try {

cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/writer/DefaultMetadataServiceClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import io.cdap.cdap.common.http.DefaultHttpRequestConfig;
2323
import io.cdap.cdap.common.internal.remote.RemoteClient;
2424
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
25-
import io.cdap.cdap.metadata.elastic.ScopedNameOfKindTypeAdapter;
26-
import io.cdap.cdap.metadata.elastic.ScopedNameTypeAdapter;
2725
import io.cdap.cdap.proto.codec.NamespacedEntityIdCodec;
2826
import io.cdap.cdap.proto.id.NamespacedEntityId;
2927
import io.cdap.cdap.spi.metadata.Metadata;
@@ -32,6 +30,8 @@
3230
import io.cdap.cdap.spi.metadata.MetadataMutationCodec;
3331
import io.cdap.cdap.spi.metadata.ScopedName;
3432
import io.cdap.cdap.spi.metadata.ScopedNameOfKind;
33+
import io.cdap.cdap.spi.metadata.ScopedNameOfKindTypeAdapter;
34+
import io.cdap.cdap.spi.metadata.ScopedNameTypeAdapter;
3535
import io.cdap.common.http.HttpMethod;
3636
import io.cdap.common.http.HttpRequest;
3737
import io.cdap.common.http.HttpResponse;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.spi.metadata;
18+
19+
import io.cdap.cdap.common.conf.CConfiguration;
20+
import io.cdap.cdap.common.conf.Constants;
21+
import java.util.Collections;
22+
import java.util.Map;
23+
24+
/**
25+
* Default implementation of the {@link MetadataStorageContext}.
26+
*/
27+
public class DefaultMetadataStorageContext implements MetadataStorageContext {
28+
29+
private static final String storageImpl = "gcp-spanner";
30+
private final Map<String, String> properties;
31+
32+
protected DefaultMetadataStorageContext(CConfiguration cConf, String storageName) {
33+
String propertiesPrefix =
34+
Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX + storageImpl + ".";
35+
this.properties = Collections.unmodifiableMap(cConf.getPropsWithPrefix(propertiesPrefix));
36+
}
37+
38+
@Override
39+
public Map<String, String> getProperties() {
40+
return properties;
41+
}
42+
}
43+
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.spi.metadata;
18+
19+
import com.google.inject.Inject;
20+
import io.cdap.cdap.common.conf.CConfiguration;
21+
import io.cdap.cdap.common.conf.Constants;
22+
import java.io.IOException;
23+
import java.util.List;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
/**
28+
* Delegates {@link io.cdap.cdap.spi.metadata.MetadataStorage} based on configured extension.
29+
*/
30+
public class DelegatingMetadataStorage implements MetadataStorage {
31+
private static final Logger LOG = LoggerFactory.getLogger(DelegatingMetadataStorage.class);
32+
33+
private final CConfiguration cConf;
34+
private final MetadataStorageExtensionLoader extensionLoader;
35+
private MetadataStorage delegate;
36+
37+
@Inject
38+
DelegatingMetadataStorage(CConfiguration cConf, MetadataStorageExtensionLoader extensionLoader) {
39+
this.cConf = cConf;
40+
this.extensionLoader = extensionLoader;
41+
}
42+
43+
@Override
44+
public void createIndex() throws IOException {
45+
getDelegate().createIndex();
46+
}
47+
48+
@Override
49+
public void close() {
50+
if (delegate != null) {
51+
delegate.close();
52+
}
53+
}
54+
55+
@Override
56+
public String getName() {
57+
return cConf.get(Constants.Metadata.STORAGE_PROVIDER_IMPLEMENTATION);
58+
}
59+
60+
@Override
61+
public void dropIndex() throws IOException {
62+
getDelegate().dropIndex();
63+
}
64+
65+
@Override
66+
public MetadataChange apply(MetadataMutation mutation, MutationOptions options)
67+
throws IOException {
68+
return getDelegate().apply(mutation, options);
69+
}
70+
71+
@Override
72+
public List<MetadataChange> batch(List<? extends MetadataMutation> mutations,
73+
MutationOptions options) throws IOException {
74+
return getDelegate().batch(mutations, options);
75+
}
76+
77+
@Override
78+
public Metadata read(Read read) throws IOException {
79+
return getDelegate().read(read);
80+
}
81+
82+
@Override
83+
public SearchResponse search(SearchRequest request)
84+
throws IOException {
85+
return getDelegate().search(request);
86+
}
87+
88+
/**
89+
* Returns the {@link MetadataStorage} to use based on configuration.
90+
*/
91+
private MetadataStorage getDelegate() {
92+
MetadataStorage metadataStorage = this.delegate;
93+
if (metadataStorage != null) {
94+
return metadataStorage;
95+
}
96+
synchronized (this) {
97+
metadataStorage = this.delegate;
98+
if (metadataStorage != null) {
99+
return metadataStorage;
100+
}
101+
metadataStorage = extensionLoader.get(getName());
102+
103+
if (metadataStorage == null) {
104+
throw new IllegalArgumentException(
105+
"Unsupported metadata storage implementation " + getName());
106+
}
107+
LOG.info("Metadata Storage {} is loaded", metadataStorage.getName());
108+
try {
109+
metadataStorage.initialize(new DefaultMetadataStorageContext(this.cConf,
110+
metadataStorage.getName()));
111+
} catch (Exception e) {
112+
throw new RuntimeException(e);
113+
}
114+
LOG.info("Metadata storage {} is initialized.", metadataStorage.getName());
115+
116+
this.delegate = metadataStorage;
117+
return metadataStorage;
118+
}
119+
}
120+
}
121+
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.spi.metadata;
18+
19+
import com.google.inject.Inject;
20+
import io.cdap.cdap.common.conf.CConfiguration;
21+
import io.cdap.cdap.common.conf.Constants;
22+
import io.cdap.cdap.common.lang.ClassPathResources;
23+
import io.cdap.cdap.common.lang.FilterClassLoader;
24+
import io.cdap.cdap.extension.AbstractExtensionLoader;
25+
import java.io.IOException;
26+
import java.util.Collections;
27+
import java.util.Set;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/**
32+
* Extension loader for {@link MetadataStorage} implementations.
33+
*/
34+
public class MetadataStorageExtensionLoader extends AbstractExtensionLoader<String, MetadataStorage> {
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(MetadataStorageExtensionLoader.class);
37+
private static final Set<String> ALLOWED_RESOURCES = createAllowedResources();
38+
private static final Set<String> ALLOWED_PACKAGES = createPackageSets(ALLOWED_RESOURCES);
39+
40+
@Inject
41+
public MetadataStorageExtensionLoader(CConfiguration cConf) {
42+
super(cConf.get(Constants.Metadata.METADATA_STORAGE_EXT_DIR));
43+
}
44+
45+
private static Set<String> createAllowedResources() {
46+
try {
47+
return ClassPathResources.getResourcesWithDependencies(MetadataStorage.class.getClassLoader(),
48+
MetadataStorage.class);
49+
} catch (IOException e) {
50+
throw new RuntimeException("Failed to trace dependencies for MetadataStorage extension.", e);
51+
}
52+
}
53+
54+
@Override
55+
protected Set<String> getSupportedTypesForProvider(MetadataStorage metadataStorage) {
56+
return Collections.singleton(metadataStorage.getName());
57+
}
58+
59+
@Override
60+
protected FilterClassLoader.Filter getExtensionParentClassLoaderFilter() {
61+
return new FilterClassLoader.Filter() {
62+
@Override
63+
public boolean acceptResource(String resource) {
64+
return ALLOWED_RESOURCES.contains(resource);
65+
}
66+
67+
@Override
68+
public boolean acceptPackage(String packageName) {
69+
return ALLOWED_PACKAGES.contains(packageName);
70+
}
71+
};
72+
}
73+
}

cdap-data-fabric/src/main/java/io/cdap/cdap/spi/metadata/dataset/DatasetMetadataStorage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public class DatasetMetadataStorage extends SearchHelper implements MetadataStor
7474
super(txClient, tableDefinition);
7575
}
7676

77+
@Override
78+
public String getName(){
79+
return getClass().getSimpleName();
80+
}
81+
7782
@Override
7883
public void createIndex() throws IOException {
7984
createDatasets();

0 commit comments

Comments
 (0)