Skip to content

Commit 49238e4

Browse files
authored
Merge pull request #16129 from cdapio/fix/logging-context-propagation
[CDAP-21245] Fix: Propagate parent logging context to asynchronous action and fork threads
2 parents a396452 + c917e6e commit 49238e4

1 file changed

Lines changed: 35 additions & 19 deletions

File tree

cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowDriver.java

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import io.cdap.cdap.common.lang.Exceptions;
6363
import io.cdap.cdap.common.lang.InstantiatorFactory;
6464
import io.cdap.cdap.common.lang.PropertyFieldSetter;
65+
import io.cdap.cdap.common.logging.LoggingContext;
6566
import io.cdap.cdap.common.logging.LoggingContextAccessor;
6667
import io.cdap.cdap.common.namespace.NamespaceQueryAdmin;
6768
import io.cdap.cdap.common.service.Retries;
@@ -328,29 +329,36 @@ private void executeAction(WorkflowActionNode node, WorkflowToken token) throws
328329
ExecutorService executorService = createExecutor(1, executorTerminateLatch,
329330
"action-" + node.getNodeId() + "-%d");
330331

332+
// Capture context for propagation
333+
final LoggingContext parentLoggingContext = LoggingContextAccessor.getLoggingContext();
334+
331335
try {
332336
// Run the action in new thread
333337
Future<?> future = executorService.submit(new Callable<Void>() {
334338
@Override
335339
public Void call() throws Exception {
336-
SchedulableProgramType programType = node.getProgram().getProgramType();
337-
String programName = node.getProgram().getProgramName();
338-
String prettyProgramType = ProgramType.valueOf(programType.name()).getPrettyName();
339-
ProgramWorkflowRunner programWorkflowRunner =
340-
workflowProgramRunnerFactory.getProgramWorkflowRunner(programType, token,
341-
node.getNodeId(), nodeStates);
342-
343-
// this should not happen, since null is only passed in from WorkflowDriver, only when calling configure
344-
if (programWorkflowRunner == null) {
345-
throw new UnsupportedOperationException("Operation not allowed.");
346-
}
340+
try (LoggingContextAccessor.LoggingContextRestorer ignored =
341+
parentLoggingContext != null ? LoggingContextAccessor.setLoggingContext(parentLoggingContext)
342+
: null) {
343+
SchedulableProgramType programType = node.getProgram().getProgramType();
344+
String programName = node.getProgram().getProgramName();
345+
String prettyProgramType = ProgramType.valueOf(programType.name()).getPrettyName();
346+
ProgramWorkflowRunner programWorkflowRunner =
347+
workflowProgramRunnerFactory.getProgramWorkflowRunner(programType, token,
348+
node.getNodeId(), nodeStates);
349+
350+
// this should not happen, since null is only passed in from WorkflowDriver, only when calling configure
351+
if (programWorkflowRunner == null) {
352+
throw new UnsupportedOperationException("Operation not allowed.");
353+
}
347354

348-
Runnable programRunner = programWorkflowRunner.create(programName);
349-
LOG.info("Starting {} Program '{}' in workflow", prettyProgramType, programName);
350-
programRunner.run();
355+
Runnable programRunner = programWorkflowRunner.create(programName);
356+
LOG.info("Starting {} Program '{}' in workflow", prettyProgramType, programName);
357+
programRunner.run();
351358

352-
LOG.info("{} Program '{}' in workflow completed", prettyProgramType, programName);
353-
return null;
359+
LOG.info("{} Program '{}' in workflow completed", prettyProgramType, programName);
360+
return null;
361+
}
354362
}
355363
});
356364
future.get();
@@ -376,14 +384,22 @@ private void executeFork(final ApplicationSpecification appSpec, WorkflowForkNod
376384
CompletionService<Map.Entry<String, WorkflowToken>> completionService =
377385
new ExecutorCompletionService<>(executorService);
378386

387+
// Capture the logging context from the parent thread to propagate to fork branches
388+
final LoggingContext parentLoggingContext = LoggingContextAccessor.getLoggingContext();
389+
379390
try {
380391
for (final List<WorkflowNode> branch : fork.getBranches()) {
381392
completionService.submit(new Callable<Map.Entry<String, WorkflowToken>>() {
382393
@Override
383394
public Map.Entry<String, WorkflowToken> call() throws Exception {
384-
WorkflowToken copiedToken = ((BasicWorkflowToken) token).deepCopy();
385-
executeAll(branch.iterator(), appSpec, instantiator, classLoader, copiedToken);
386-
return Maps.immutableEntry(branch.toString(), copiedToken);
395+
// Set the logging context on the worker thread
396+
try (LoggingContextAccessor.LoggingContextRestorer ignored =
397+
parentLoggingContext != null ? LoggingContextAccessor.setLoggingContext(parentLoggingContext)
398+
: null) {
399+
WorkflowToken copiedToken = ((BasicWorkflowToken) token).deepCopy();
400+
executeAll(branch.iterator(), appSpec, instantiator, classLoader, copiedToken);
401+
return Maps.immutableEntry(branch.toString(), copiedToken);
402+
}
387403
}
388404
});
389405
}

0 commit comments

Comments
 (0)