Skip to content

Commit 11b9197

Browse files
author
Komal Yadav
committed
Add SpannerMetadataModule with ExtensionLoader
Format Changes changes Changes name-changes formatting-changes elastic-changes changes Added copyright section Updated Updated Added Javadocs Corrected Copyright Corrected Copyright Lexicographical order correction Formatting correction Formatting correction changes Updated PR Updated names Updated comments
1 parent 60e58d4 commit 11b9197

20 files changed

Lines changed: 637 additions & 16 deletions

File tree

cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2114,9 +2114,13 @@ public static final class Metadata {
21142114
public static final String STORAGE_PROVIDER_IMPLEMENTATION = "metadata.storage.implementation";
21152115
public static final String STORAGE_PROVIDER_NOSQL = "nosql";
21162116
public static final String STORAGE_PROVIDER_ELASTICSEARCH = "elastic";
2117+
public static final String STORAGE_PROVIDER_SPANNER = "gcp-spanner";
21172118

21182119
public static final String METADATA_WRITER_SUBSCRIBER = "metadata.writer";
21192120
public static final String METADATA_CONSUMER_WRITER_SUBSCRIBER = "metadata.consumer.writer";
2121+
2122+
// Metadata configs
2123+
public static final String METADATA_STORAGE_EXT_DIR = "metadata.storage.extensions.dir";
21202124
}
21212125

21222126
/**

cdap-common/src/main/resources/cdap-default.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2845,6 +2845,11 @@
28452845
<value>/opt/cdap/master/ext/log-publisher</value>
28462846
</property>
28472847

2848+
<property>
2849+
<name>metadata.storage.extensions.dir</name>
2850+
<value>/opt/cdap/master/ext/metadata-storage</value>
2851+
</property>
2852+
28482853
<!-- Metrics Configuration -->
28492854

28502855
<property>

cdap-data-fabric/src/main/java/io/cdap/cdap/data/runtime/DataSetsModules.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.cdap.cdap.data2.registry.UsageWriter;
4646
import io.cdap.cdap.metadata.elastic.ElasticsearchMetadataStorage;
4747
import io.cdap.cdap.security.impersonation.OwnerStore;
48+
import io.cdap.cdap.spi.metadata.DelegatingMetadataStorage;
4849
import io.cdap.cdap.spi.metadata.MetadataStorage;
4950
import io.cdap.cdap.spi.metadata.dataset.DatasetMetadataStorage;
5051
import io.cdap.cdap.spi.metadata.noop.NoopMetadataStorage;
@@ -178,11 +179,21 @@ public MetadataStorage get() {
178179
if (Constants.Metadata.STORAGE_PROVIDER_NOSQL.equalsIgnoreCase(config)) {
179180
return injector.getInstance(DatasetMetadataStorage.class);
180181
}
182+
183+
// TODO (CDAP-21173): Load elastic search implementation using DelegatingMetadataStorage
181184
if (Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH.equalsIgnoreCase(config)) {
182185
return injector.getInstance(ElasticsearchMetadataStorage.class);
183186
}
184-
throw new IllegalArgumentException("Unsupported MetadataStorage '" + config + "'. Only '"
185-
+ Constants.Metadata.STORAGE_PROVIDER_NOSQL + "' and '"
186-
+ Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH + "' are allowed.");
187+
if (Constants.Metadata.STORAGE_PROVIDER_SPANNER.equalsIgnoreCase(config)) {
188+
return injector.getInstance(DelegatingMetadataStorage.class);
189+
}
190+
String errorMessage = String.format(
191+
"Unsupported MetadataStorage '%s'. Only '%s', '%s' and '%s' are allowed.",
192+
config,
193+
Constants.Metadata.STORAGE_PROVIDER_NOSQL,
194+
Constants.Metadata.STORAGE_PROVIDER_SPANNER,
195+
Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH
196+
);
197+
throw new IllegalArgumentException(errorMessage);
187198
}
188199
}

cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/AuditMetadataStorage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ public void setAuditPublisher(AuditPublisher auditPublisher) {
8383
this.auditPublisher = auditPublisher;
8484
}
8585

86+
@Override
87+
public String getName() {
88+
return getClass().getSimpleName();
89+
}
90+
8691
@Override
8792
public void createIndex() throws IOException {
8893
try {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.spi.metadata;
18+
19+
import io.cdap.cdap.common.conf.CConfiguration;
20+
import io.cdap.cdap.common.conf.Constants;
21+
import java.util.Collections;
22+
import java.util.Map;
23+
24+
// TODO(CDAP-21174): Create metadata storage specify properties
25+
/**
26+
* Default implementation of the {@link MetadataStorageContext}.
27+
*/
28+
public class DefaultMetadataStorageContext implements MetadataStorageContext {
29+
30+
private static final String storageImpl = "gcp-spanner";
31+
private final Map<String, String> properties;
32+
33+
protected DefaultMetadataStorageContext(CConfiguration cConf, String storageName) {
34+
String propertiesPrefix =
35+
Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX + storageImpl + ".";
36+
this.properties = Collections.unmodifiableMap(cConf.getPropsWithPrefix(propertiesPrefix));
37+
}
38+
39+
@Override
40+
public Map<String, String> getProperties() {
41+
return properties;
42+
}
43+
}
44+
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.spi.metadata;
18+
19+
import com.google.inject.Inject;
20+
import io.cdap.cdap.common.conf.CConfiguration;
21+
import io.cdap.cdap.common.conf.Constants;
22+
import java.io.IOException;
23+
import java.util.List;
24+
25+
/**
26+
* Delegates {@link io.cdap.cdap.spi.metadata.MetadataStorage} based on configured extension.
27+
*/
28+
public class DelegatingMetadataStorage implements MetadataStorage {
29+
private final CConfiguration cConf;
30+
private final MetadataStorage delegate;
31+
32+
@Inject
33+
DelegatingMetadataStorage(CConfiguration cConf, MetadataStorageExtensionLoader extensionLoader) throws Exception {
34+
this.cConf = cConf;
35+
36+
this.delegate = extensionLoader.get(getName());
37+
38+
if (this.delegate == null) {
39+
throw new IllegalArgumentException("Unsupported MetadataProvider type: " + getName());
40+
}
41+
this.delegate.initialize(new DefaultMetadataStorageContext(cConf, getName()));
42+
}
43+
44+
@Override
45+
public void createIndex() throws IOException {
46+
delegate.createIndex();
47+
}
48+
49+
@Override
50+
public void close() {
51+
if (delegate != null) {
52+
delegate.close();
53+
}
54+
}
55+
56+
@Override
57+
public String getName() {
58+
return cConf.get(Constants.Metadata.STORAGE_PROVIDER_IMPLEMENTATION);
59+
}
60+
61+
@Override
62+
public void dropIndex() throws IOException {
63+
delegate.dropIndex();
64+
}
65+
66+
@Override
67+
public MetadataChange apply(MetadataMutation mutation, MutationOptions options)
68+
throws IOException {
69+
return delegate.apply(mutation, options);
70+
}
71+
72+
@Override
73+
public List<MetadataChange> batch(List<? extends MetadataMutation> mutations,
74+
MutationOptions options) throws IOException {
75+
return delegate.batch(mutations, options);
76+
}
77+
78+
@Override
79+
public Metadata read(Read read) throws IOException {
80+
return delegate.read(read);
81+
}
82+
83+
@Override
84+
public SearchResponse search(SearchRequest request)
85+
throws IOException {
86+
return delegate.search(request);
87+
}
88+
}
89+
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.spi.metadata;
18+
19+
import com.google.inject.Inject;
20+
import io.cdap.cdap.common.conf.CConfiguration;
21+
import io.cdap.cdap.common.conf.Constants;
22+
import io.cdap.cdap.common.lang.ClassPathResources;
23+
import io.cdap.cdap.common.lang.FilterClassLoader;
24+
import io.cdap.cdap.extension.AbstractExtensionLoader;
25+
import java.io.IOException;
26+
import java.util.Collections;
27+
import java.util.Set;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/**
32+
* Extension loader for {@link MetadataStorage} implementations.
33+
*/
34+
public class MetadataStorageExtensionLoader extends AbstractExtensionLoader<String, MetadataStorage> {
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(MetadataStorageExtensionLoader.class);
37+
private static final Set<String> ALLOWED_RESOURCES = createAllowedResources();
38+
private static final Set<String> ALLOWED_PACKAGES = createPackageSets(ALLOWED_RESOURCES);
39+
40+
@Inject
41+
public MetadataStorageExtensionLoader(CConfiguration cConf) {
42+
super(cConf.get(Constants.Metadata.METADATA_STORAGE_EXT_DIR));
43+
}
44+
45+
private static Set<String> createAllowedResources() {
46+
try {
47+
return ClassPathResources.getResourcesWithDependencies(MetadataStorage.class.getClassLoader(),
48+
MetadataStorage.class);
49+
} catch (IOException e) {
50+
throw new RuntimeException("Failed to trace dependencies for MetadataStorage extension.", e);
51+
}
52+
}
53+
54+
@Override
55+
protected Set<String> getSupportedTypesForProvider(MetadataStorage metadataStorage) {
56+
return Collections.singleton(metadataStorage.getName());
57+
}
58+
59+
@Override
60+
protected FilterClassLoader.Filter getExtensionParentClassLoaderFilter() {
61+
return new FilterClassLoader.Filter() {
62+
@Override
63+
public boolean acceptResource(String resource) {
64+
return ALLOWED_RESOURCES.contains(resource);
65+
}
66+
67+
@Override
68+
public boolean acceptPackage(String packageName) {
69+
return ALLOWED_PACKAGES.contains(packageName);
70+
}
71+
};
72+
}
73+
}

cdap-data-fabric/src/main/java/io/cdap/cdap/spi/metadata/dataset/DatasetMetadataStorage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public class DatasetMetadataStorage extends SearchHelper implements MetadataStor
7474
super(txClient, tableDefinition);
7575
}
7676

77+
@Override
78+
public String getName(){
79+
return getClass().getSimpleName();
80+
}
81+
7782
@Override
7883
public void createIndex() throws IOException {
7984
createDatasets();

cdap-data-fabric/src/main/java/io/cdap/cdap/spi/metadata/noop/NoopMetadataStorage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
*/
3535
public class NoopMetadataStorage implements MetadataStorage {
3636

37+
@Override
38+
public String getName() {
39+
return getClass().getSimpleName();
40+
}
41+
3742
@Override
3843
public void createIndex() throws IOException {
3944
// no-op

cdap-elastic/src/main/java/io/cdap/cdap/metadata/elastic/RequestAndChange.java renamed to cdap-elastic/src/main/java/io/cdap/cdap/metadata/elastic/ElasticChangeRequest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,20 @@
1616

1717
package io.cdap.cdap.metadata.elastic;
1818

19+
import io.cdap.cdap.spi.metadata.ChangeRequest;
1920
import io.cdap.cdap.spi.metadata.MetadataChange;
2021
import org.elasticsearch.action.support.WriteRequest;
2122

2223
/**
2324
* A simple class to pass around an Elasticsearch index write request, along with the metadata
2425
* change that it effects.
2526
*/
26-
public class RequestAndChange {
27+
public class ElasticChangeRequest implements ChangeRequest {
2728

2829
private final WriteRequest<?> request;
2930
private final MetadataChange change;
3031

31-
public RequestAndChange(WriteRequest<?> request, MetadataChange change) {
32+
public ElasticChangeRequest(WriteRequest<?> request, MetadataChange change) {
3233
this.request = request;
3334
this.change = change;
3435
}
@@ -37,6 +38,7 @@ public WriteRequest<?> getRequest() {
3738
return request;
3839
}
3940

41+
@Override
4042
public MetadataChange getChange() {
4143
return change;
4244
}

0 commit comments

Comments
 (0)