Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cdap-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>test-jar</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,54 @@ public static void startAndWait(Service service, long timeout, TimeUnit timeoutU
throws TimeoutException, InterruptedException, ExecutionException {
startAndWait(service, timeout, timeoutUnit, null);
}

/**
* Starts a service and waits for it to be running, using reflection
* to be compatible with both Guava 13 and Guava 15+ / 20+.
*/
public static void startAndWait(Service service) {
try {
try {
// Guava 15+
service.getClass().getMethod("startAsync").invoke(service);
service.getClass().getMethod("awaitRunning").invoke(service);
} catch (NoSuchMethodException e) {
// Guava 13
Object future = service.getClass().getMethod("start").invoke(service);
if (future instanceof ListenableFuture) {
((ListenableFuture<?>) future).get();
}
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
}
} catch (Exception e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
}
}

/**
* Stops a service and waits for it to be terminated, using reflection
* to be compatible with both Guava 13 and Guava 15+ / 20+.
*/
public static void stopAndWait(Service service) {
try {
try {
// Guava 15+
service.getClass().getMethod("stopAsync").invoke(service);
service.getClass().getMethod("awaitTerminated").invoke(service);
} catch (NoSuchMethodException e) {
// Guava 13
Object future = service.getClass().getMethod("stop").invoke(service);
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
if (future instanceof ListenableFuture) {
((ListenableFuture<?>) future).get();
}
}
} catch (Exception e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.cdap.cdap.gateway.router;

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
Expand Down Expand Up @@ -141,7 +141,7 @@ public Optional<InetSocketAddress> getBoundAddress() {
protected void startUp() throws Exception {
// If internal authorization enforcement is enabled, we avoid re-initialization of the token manager.
if (SecurityUtil.isManagedSecurity(cConf) && !SecurityUtil.isInternalAuthEnabled(cConf)) {
tokenValidator.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(tokenValidator);
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
Outdated
}
ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
serverCancellable = startServer(createServerBootstrap(channelGroup), channelGroup);
Expand All @@ -157,14 +157,14 @@ protected void shutDown() {
serverCancellable.cancel();
// If internal authorization enforcement is enabled, we avoid duplicate cleanup of the token manager.
if (SecurityUtil.isManagedSecurity(cConf) && !SecurityUtil.isInternalAuthEnabled(cConf)) {
tokenValidator.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(tokenValidator);

Comment thread
AbhishekKumar9984 marked this conversation as resolved.
Outdated
}

LOG.info("Stopped Netty Router.");
}

@Override
protected Executor executor(final State state) {
protected Executor executor() {
final AtomicInteger id = new AtomicInteger();
return runnable -> {
Thread t = new Thread(runnable, String.format("NettyRouter-%d", id.incrementAndGet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void init(String[] args) {
LOG.info("Router initialized.");
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
throw Throwables.propagate(t);
throw new RuntimeException(t);
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
}
}

Expand All @@ -116,7 +116,7 @@ public void start() throws Exception {
+ "ZooKeeper quorum settings are correct in "
+ "cdap-site.xml. Currently configured as: %s",
zkClientService.getConnectString()));
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
Outdated
LOG.info("Router started.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.cdap.cdap.gateway;

import com.google.common.io.Closeables;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
Expand Down Expand Up @@ -179,38 +178,38 @@ protected void configure() {

messagingService = injector.getInstance(MessagingService.class);
if (messagingService instanceof Service) {
((Service) messagingService).startAndWait();
io.cdap.cdap.common.service.Services.startAndWait((Service) messagingService);
}
txService = injector.getInstance(TransactionManager.class);
txService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(txService);
// Define all StructuredTable before starting any services that need StructuredTable
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));
metadataStorage = injector.getInstance(MetadataStorage.class);
metadataStorage.createIndex();
metadataService = injector.getInstance(MetadataService.class);
metadataService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(metadataService);

dsOpService = injector.getInstance(DatasetOpExecutorService.class);
dsOpService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(dsOpService);
datasetService = injector.getInstance(DatasetService.class);
datasetService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(datasetService);
appFabricServer = injector.getInstance(AppFabricServer.class);
appFabricServer.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(appFabricServer);
appFabricProcessorService = injector.getInstance(AppFabricProcessorService.class);
appFabricProcessorService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(appFabricProcessorService);
logQueryService = injector.getInstance(LogQueryService.class);
logQueryService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(logQueryService);
metricsQueryService = injector.getInstance(MetricsQueryService.class);
metricsQueryService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(metricsQueryService);
metricsCollectionService = injector.getInstance(MetricsCollectionService.class);
metricsCollectionService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(metricsCollectionService);
namespaceAdmin = injector.getInstance(NamespaceAdmin.class);
namespaceAdmin.create(TEST_NAMESPACE_META1);
namespaceAdmin.create(TEST_NAMESPACE_META2);

// Restart handlers to check if they are resilient across restarts.
router = injector.getInstance(NettyRouter.class);
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);
port = router.getBoundAddress().orElseThrow(IllegalStateException::new).getPort();

return injector;
Expand All @@ -220,19 +219,25 @@ public static void stopGateway(CConfiguration conf) throws Exception {
namespaceAdmin.delete(new NamespaceId(TEST_NAMESPACE1));
namespaceAdmin.delete(new NamespaceId(TEST_NAMESPACE2));
namespaceAdmin.delete(NamespaceId.DEFAULT);
appFabricServer.stopAndWait();
appFabricProcessorService.stopAndWait();
metricsCollectionService.stopAndWait();
metricsQueryService.stopAndWait();
logQueryService.stopAndWait();
router.stopAndWait();
datasetService.stopAndWait();
dsOpService.stopAndWait();
metadataService.stopAndWait();
Closeables.closeQuietly(metadataStorage);
txService.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(appFabricServer);
io.cdap.cdap.common.service.Services.stopAndWait(appFabricProcessorService);
io.cdap.cdap.common.service.Services.stopAndWait(metricsCollectionService);
io.cdap.cdap.common.service.Services.stopAndWait(metricsQueryService);
io.cdap.cdap.common.service.Services.stopAndWait(logQueryService);
io.cdap.cdap.common.service.Services.stopAndWait(router);
io.cdap.cdap.common.service.Services.stopAndWait(datasetService);
io.cdap.cdap.common.service.Services.stopAndWait(dsOpService);
io.cdap.cdap.common.service.Services.stopAndWait(metadataService);
try {

metadataStorage.close();

} catch (Exception ignored) {

}
io.cdap.cdap.common.service.Services.stopAndWait(txService);
if (messagingService instanceof Service) {
((Service) messagingService).stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait((Service) messagingService);
}
conf.clear();
}
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,30 +149,29 @@ protected void configure() {
}));

transactionManager = injector.getInstance(TransactionManager.class);
transactionManager.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(transactionManager);
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));
dsOpService = injector.getInstance(DatasetOpExecutorService.class);
dsOpService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(dsOpService);

datasetService = injector.getInstance(DatasetService.class);
datasetService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(datasetService);

Comment thread
AbhishekKumar9984 marked this conversation as resolved.
logQueryService = injector.getInstance(LogQueryService.class);
logQueryService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(logQueryService);

mockLogReader = (MockLogReader) injector.getInstance(LogReader.class);
mockLogReader = (MockLogReader) injector.getInstance(LogReader.class);
mockLogReader.generateLogs();

discoveryServiceClient = injector.getInstance(DiscoveryServiceClient.class);
}

@AfterClass
public static void tearDown() {
logQueryService.stopAndWait();

datasetService.stopAndWait();
dsOpService.stopAndWait();
transactionManager.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(logQueryService);
io.cdap.cdap.common.service.Services.stopAndWait(datasetService);
io.cdap.cdap.common.service.Services.stopAndWait(dsOpService);
io.cdap.cdap.common.service.Services.stopAndWait(transactionManager);
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
Outdated
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,20 @@ protected void configure() {
}));

transactionManager = injector.getInstance(TransactionManager.class);
transactionManager.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(transactionManager);
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));

dsOpService = injector.getInstance(DatasetOpExecutorService.class);
dsOpService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(dsOpService);

datasetService = injector.getInstance(DatasetService.class);
datasetService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(datasetService);

metrics = injector.getInstance(MetricsQueryService.class);
metrics.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(metrics);

collectionService = injector.getInstance(MetricsCollectionService.class);
collectionService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(collectionService);

// initialize the dataset instantiator
DiscoveryServiceClient discoveryClient = injector.getInstance(DiscoveryServiceClient.class);
Expand All @@ -202,11 +202,11 @@ protected void configure() {
}

public static void stopMetricsService(CConfiguration conf) {
collectionService.stopAndWait();
datasetService.stopAndWait();
dsOpService.stopAndWait();
transactionManager.stopAndWait();
metrics.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(collectionService);
io.cdap.cdap.common.service.Services.stopAndWait(datasetService);
io.cdap.cdap.common.service.Services.stopAndWait(dsOpService);
io.cdap.cdap.common.service.Services.stopAndWait(transactionManager);
io.cdap.cdap.common.service.Services.stopAndWait(metrics);
conf.clear();
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public static void init() throws Exception {
successValidator,
new MockAccessTokenIdentityExtractor(successValidator), discoveryService,
new NoOpAeadCipher());
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);

httpService = NettyHttpService.builder("test").setHttpHandlers(new TestHandler()).build();
httpService.start();
Expand All @@ -119,7 +119,7 @@ public static void init() throws Exception {
public static void finish() throws Exception {
cancelDiscovery.cancel();
httpService.stop();
router.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(router);
}
Comment thread
AbhishekKumar9984 marked this conversation as resolved.

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ protected void startUp() {
new RouterServiceLookup(cConf, (DiscoveryServiceClient) discoveryService,
new RouterPathLookup()),
validator, userIdentityExtractor, discoveryServiceClient, new NoOpAeadCipher());
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);
}

@Override
protected void shutDown() {
router.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(router);
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
Outdated
}

InetSocketAddress getRouterAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void init() throws Exception {
successValidator,
new MockAccessTokenIdentityExtractor(successValidator), discoveryService,
new NoOpAeadCipher());
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);

httpService = NettyHttpService.builder("test").setHttpHandlers(new AuditLogTest.TestHandler())
.build();
Expand Down Expand Up @@ -121,7 +121,7 @@ public void testRouterStatus() throws Exception {
public static void finish() throws Exception {
cancelDiscovery.cancel();
httpService.stop();
router.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(router);
}

private void testGet(int expectedStatus, String expectedResponse, String path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ protected void startUp() {
new RouterPathLookup()),
new SuccessTokenValidator(), userIdentityExtractor, discoveryServiceClient,
new NoOpAeadCipher());
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);
}

@Override
protected void shutDown() {
router.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(router);
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
Outdated
}

public InetSocketAddress getRouterAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ protected void startUp() {
new RouterPathLookup()),
new SuccessTokenValidator(), userIdentityExtractor, discoveryServiceClient,
new NoOpAeadCipher());
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);
}

@Override
protected void shutDown() {
router.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(router);
}

public InetSocketAddress getRouterAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private void deploy(int num) throws Exception {

LocationFactory lf = new LocalLocationFactory(TMP_FOLDER.newFolder());
Location programJar = AppJarHelper.createDeploymentJar(lf, DummyApp.class);
GATEWAY_SERVER.setExpectedJarBytes(ByteStreams.toByteArray(Locations.newInputSupplier(programJar)));
GATEWAY_SERVER.setExpectedJarBytes(ByteStreams.toByteArray(Locations.newInputSupplier(programJar).getInput()));

for (int i = 0; i < num; i++) {
LOG.info("Deploying {}/{}", i, num);
Expand All @@ -220,7 +220,7 @@ private void deploy(int num) throws Exception {
urlConn.setDoOutput(true);
urlConn.setDoInput(true);

ByteStreams.copy(Locations.newInputSupplier(programJar), urlConn.getOutputStream());
ByteStreams.copy(Locations.newInputSupplier(programJar).getInput(), urlConn.getOutputStream());
Assert.assertEquals(200, urlConn.getResponseCode());
urlConn.getInputStream().close();
urlConn.disconnect();
Expand Down
Loading