Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cdap-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>test-jar</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,54 @@ public static void startAndWait(Service service, long timeout, TimeUnit timeoutU
throws TimeoutException, InterruptedException, ExecutionException {
startAndWait(service, timeout, timeoutUnit, null);
}

/**
* Starts a service and waits for it to be running, using reflection
* to be compatible with both Guava 13 and Guava 15+ / 20+.
*/
public static void startAndWait(Service service) {
try {
try {
// Guava 15+
service.getClass().getMethod("startAsync").invoke(service);
service.getClass().getMethod("awaitRunning").invoke(service);
} catch (NoSuchMethodException e) {
// Guava 13
Object future = service.getClass().getMethod("start").invoke(service);
if (future instanceof ListenableFuture) {
((ListenableFuture<?>) future).get();
}
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
}
} catch (Exception e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
}
}

/**
* Stops a service and waits for it to be terminated, using reflection
* to be compatible with both Guava 13 and Guava 15+ / 20+.
*/
public static void stopAndWait(Service service) {
try {
try {
// Guava 15+
service.getClass().getMethod("stopAsync").invoke(service);
service.getClass().getMethod("awaitTerminated").invoke(service);
} catch (NoSuchMethodException e) {
// Guava 13
Object future = service.getClass().getMethod("stop").invoke(service);
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
if (future instanceof ListenableFuture) {
((ListenableFuture<?>) future).get();
}
}
} catch (Exception e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.cdap.cdap.gateway.router;

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
Expand All @@ -30,6 +30,7 @@
import io.cdap.cdap.common.encryption.guice.UserCredentialAeadEncryptionModule;
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.common.security.KeyStores;
import io.cdap.cdap.common.service.Services;
import io.cdap.cdap.gateway.router.handlers.AuditLogHandler;
import io.cdap.cdap.gateway.router.handlers.AuthenticationHandler;
import io.cdap.cdap.gateway.router.handlers.ConfigBasedRequestBlockingHandler;
Expand Down Expand Up @@ -141,7 +142,7 @@ public Optional<InetSocketAddress> getBoundAddress() {
protected void startUp() throws Exception {
// If internal authorization enforcement is enabled, we avoid re-initialization of the token manager.
if (SecurityUtil.isManagedSecurity(cConf) && !SecurityUtil.isInternalAuthEnabled(cConf)) {
tokenValidator.startAndWait();
Services.startAndWait(tokenValidator);
}
ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
serverCancellable = startServer(createServerBootstrap(channelGroup), channelGroup);
Expand All @@ -157,14 +158,13 @@ protected void shutDown() {
serverCancellable.cancel();
// If internal authorization enforcement is enabled, we avoid duplicate cleanup of the token manager.
if (SecurityUtil.isManagedSecurity(cConf) && !SecurityUtil.isInternalAuthEnabled(cConf)) {
tokenValidator.stopAndWait();
Services.stopAndWait(tokenValidator);
}

LOG.info("Stopped Netty Router.");
}

@Override
protected Executor executor(final State state) {
protected Executor executor() {
final AtomicInteger id = new AtomicInteger();
return runnable -> {
Thread t = new Thread(runnable, String.format("NettyRouter-%d", id.incrementAndGet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import io.cdap.cdap.common.guice.ZkClientModule;
import io.cdap.cdap.common.guice.ZkDiscoveryModule;
import io.cdap.cdap.common.runtime.DaemonMain;
import io.cdap.cdap.common.service.Services;
import io.cdap.cdap.security.guice.CoreSecurityRuntimeModule;
import io.cdap.cdap.security.guice.ExternalAuthenticationModule;
import io.cdap.cdap.security.impersonation.SecurityUtil;
import java.util.concurrent.TimeUnit;
import org.apache.twill.internal.Services;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -101,29 +101,30 @@ public void init(String[] args) {
LOG.info("Router initialized.");
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
throw Throwables.propagate(t);
Throwables.propagateIfPossible(t);
throw new RuntimeException(t);
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
}
}

@Override
public void start() throws Exception {
LOG.info("Starting Router...");
io.cdap.cdap.common.service.Services.startAndWait(zkClientService,
Services.startAndWait(zkClientService,
cConf.getLong(Constants.Zookeeper.CLIENT_STARTUP_TIMEOUT_MILLIS),
TimeUnit.MILLISECONDS,
String.format("Connection timed out while trying to start "
+ "ZooKeeper client. Please verify that the "
+ "ZooKeeper quorum settings are correct in "
+ "cdap-site.xml. Currently configured as: %s",
zkClientService.getConnectString()));
router.startAndWait();
Services.startAndWait(router);
LOG.info("Router started.");
}

@Override
public void stop() {
LOG.info("Stopping Router...");
Futures.getUnchecked(Services.chainStop(router, zkClientService));
Futures.getUnchecked(org.apache.twill.internal.Services.chainStop(router, zkClientService));
LOG.info("Router stopped.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.cdap.cdap.gateway;

import com.google.common.io.Closeables;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
Expand All @@ -32,6 +31,7 @@
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.namespace.NamespaceAdmin;
import io.cdap.cdap.common.service.Services;
import io.cdap.cdap.common.utils.Networks;
import io.cdap.cdap.common.utils.Tasks;
import io.cdap.cdap.data2.datafabric.dataset.service.DatasetService;
Expand Down Expand Up @@ -179,38 +179,38 @@ protected void configure() {

messagingService = injector.getInstance(MessagingService.class);
if (messagingService instanceof Service) {
((Service) messagingService).startAndWait();
Services.startAndWait((Service) messagingService);
}
txService = injector.getInstance(TransactionManager.class);
txService.startAndWait();
Services.startAndWait(txService);
// Define all StructuredTable before starting any services that need StructuredTable
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));
metadataStorage = injector.getInstance(MetadataStorage.class);
metadataStorage.createIndex();
metadataService = injector.getInstance(MetadataService.class);
metadataService.startAndWait();
Services.startAndWait(metadataService);

dsOpService = injector.getInstance(DatasetOpExecutorService.class);
dsOpService.startAndWait();
Services.startAndWait(dsOpService);
datasetService = injector.getInstance(DatasetService.class);
datasetService.startAndWait();
Services.startAndWait(datasetService);
appFabricServer = injector.getInstance(AppFabricServer.class);
appFabricServer.startAndWait();
Services.startAndWait(appFabricServer);
appFabricProcessorService = injector.getInstance(AppFabricProcessorService.class);
appFabricProcessorService.startAndWait();
Services.startAndWait(appFabricProcessorService);
logQueryService = injector.getInstance(LogQueryService.class);
logQueryService.startAndWait();
Services.startAndWait(logQueryService);
metricsQueryService = injector.getInstance(MetricsQueryService.class);
metricsQueryService.startAndWait();
Services.startAndWait(metricsQueryService);
metricsCollectionService = injector.getInstance(MetricsCollectionService.class);
metricsCollectionService.startAndWait();
Services.startAndWait(metricsCollectionService);
namespaceAdmin = injector.getInstance(NamespaceAdmin.class);
namespaceAdmin.create(TEST_NAMESPACE_META1);
namespaceAdmin.create(TEST_NAMESPACE_META2);

// Restart handlers to check if they are resilient across restarts.
router = injector.getInstance(NettyRouter.class);
router.startAndWait();
Services.startAndWait(router);
port = router.getBoundAddress().orElseThrow(IllegalStateException::new).getPort();

return injector;
Expand All @@ -220,19 +220,25 @@ public static void stopGateway(CConfiguration conf) throws Exception {
namespaceAdmin.delete(new NamespaceId(TEST_NAMESPACE1));
namespaceAdmin.delete(new NamespaceId(TEST_NAMESPACE2));
namespaceAdmin.delete(NamespaceId.DEFAULT);
appFabricServer.stopAndWait();
appFabricProcessorService.stopAndWait();
metricsCollectionService.stopAndWait();
metricsQueryService.stopAndWait();
logQueryService.stopAndWait();
router.stopAndWait();
datasetService.stopAndWait();
dsOpService.stopAndWait();
metadataService.stopAndWait();
Closeables.closeQuietly(metadataStorage);
txService.stopAndWait();
Services.stopAndWait(appFabricServer);
Services.stopAndWait(appFabricProcessorService);
Services.stopAndWait(metricsCollectionService);
Services.stopAndWait(metricsQueryService);
Services.stopAndWait(logQueryService);
Services.stopAndWait(router);
Services.stopAndWait(datasetService);
Services.stopAndWait(dsOpService);
Services.stopAndWait(metadataService);
try {

metadataStorage.close();

} catch (Exception ignored) {

}
Services.stopAndWait(txService);
if (messagingService instanceof Service) {
((Service) messagingService).stopAndWait();
Services.stopAndWait((Service) messagingService);
}
conf.clear();
}
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.cdap.cdap.common.guice.RemoteAuthenticatorModules;
import io.cdap.cdap.common.http.DefaultHttpRequestConfig;
import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService;
import io.cdap.cdap.common.service.Services;
import io.cdap.cdap.data.runtime.DataFabricModules;
import io.cdap.cdap.data.runtime.DataSetServiceModules;
import io.cdap.cdap.data.runtime.DataSetsModules;
Expand Down Expand Up @@ -149,30 +150,29 @@ protected void configure() {
}));

transactionManager = injector.getInstance(TransactionManager.class);
transactionManager.startAndWait();
Services.startAndWait(transactionManager);
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));
dsOpService = injector.getInstance(DatasetOpExecutorService.class);
dsOpService.startAndWait();
Services.startAndWait(dsOpService);

datasetService = injector.getInstance(DatasetService.class);
datasetService.startAndWait();
Services.startAndWait(datasetService);

Comment thread
AbhishekKumar9984 marked this conversation as resolved.
logQueryService = injector.getInstance(LogQueryService.class);
logQueryService.startAndWait();
Services.startAndWait(logQueryService);

mockLogReader = (MockLogReader) injector.getInstance(LogReader.class);
mockLogReader = (MockLogReader) injector.getInstance(LogReader.class);
mockLogReader.generateLogs();

discoveryServiceClient = injector.getInstance(DiscoveryServiceClient.class);
}

@AfterClass
public static void tearDown() {
logQueryService.stopAndWait();

datasetService.stopAndWait();
dsOpService.stopAndWait();
transactionManager.stopAndWait();
Services.stopAndWait(logQueryService);
Services.stopAndWait(datasetService);
Services.stopAndWait(dsOpService);
Services.stopAndWait(transactionManager);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.inject.util.Modules;
import io.cdap.cdap.api.metrics.MetricStore;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.service.Services;
import io.cdap.cdap.app.metrics.MapReduceMetrics;
import io.cdap.cdap.app.store.Store;
import io.cdap.cdap.common.conf.CConfiguration;
Expand Down Expand Up @@ -174,20 +175,20 @@ protected void configure() {
}));

transactionManager = injector.getInstance(TransactionManager.class);
transactionManager.startAndWait();
Services.startAndWait(transactionManager);
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));

dsOpService = injector.getInstance(DatasetOpExecutorService.class);
dsOpService.startAndWait();
Services.startAndWait(dsOpService);

datasetService = injector.getInstance(DatasetService.class);
datasetService.startAndWait();
Services.startAndWait(datasetService);

metrics = injector.getInstance(MetricsQueryService.class);
metrics.startAndWait();
Services.startAndWait(metrics);

collectionService = injector.getInstance(MetricsCollectionService.class);
collectionService.startAndWait();
Services.startAndWait(collectionService);

// initialize the dataset instantiator
DiscoveryServiceClient discoveryClient = injector.getInstance(DiscoveryServiceClient.class);
Expand All @@ -202,11 +203,11 @@ protected void configure() {
}

public static void stopMetricsService(CConfiguration conf) {
collectionService.stopAndWait();
datasetService.stopAndWait();
dsOpService.stopAndWait();
transactionManager.stopAndWait();
metrics.stopAndWait();
Services.stopAndWait(collectionService);
Services.stopAndWait(datasetService);
Services.stopAndWait(dsOpService);
Services.stopAndWait(transactionManager);
Services.stopAndWait(metrics);
conf.clear();
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.cdap.cdap.common.encryption.NoOpAeadCipher;
import io.cdap.cdap.common.security.AuditDetail;
import io.cdap.cdap.common.security.AuditPolicy;
import io.cdap.cdap.common.service.Services;
import io.cdap.cdap.security.auth.TokenValidator;
import io.cdap.http.AbstractHttpHandler;
import io.cdap.http.HttpResponder;
Expand Down Expand Up @@ -103,7 +104,7 @@ public static void init() throws Exception {
successValidator,
new MockAccessTokenIdentityExtractor(successValidator), discoveryService,
new NoOpAeadCipher());
router.startAndWait();
Services.startAndWait(router);

httpService = NettyHttpService.builder("test").setHttpHandlers(new TestHandler()).build();
httpService.start();
Expand All @@ -119,7 +120,7 @@ public static void init() throws Exception {
public static void finish() throws Exception {
cancelDiscovery.cancel();
httpService.stop();
router.stopAndWait();
Services.stopAndWait(router);
}
Comment thread
AbhishekKumar9984 marked this conversation as resolved.

@Test
Expand Down
Loading