Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void updateActionForDeployment(Map<String, Object> newConfig, Deployment
// if the update is cancelled, don't perform merge
if (totallyCompleteFuture.isCancelled()) {
logger.atInfo(MERGE_CONFIG_EVENT_KEY).kv("deployment", deploymentId)
.log("Future was cancelled so no need to go through with the update");
.log("Deployment was cancelled, so no need to perform config merge update");
return;
}
Map<String, Object> serviceConfig;
Expand Down
21 changes: 11 additions & 10 deletions src/main/java/com/aws/greengrass/deployment/DeploymentService.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import lombok.Setter;
import org.apache.commons.io.FileUtils;
import software.amazon.awssdk.iot.iotjobs.model.JobStatus;
import software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction;

import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -504,10 +505,12 @@ private void cancelCurrentDeployment() {
.isCancellable()) {
logger.atInfo().log("Deployment already finished processing or cannot be cancelled");
} else {
boolean canCancelDeployment = context.get(UpdateSystemPolicyService.class).discardPendingUpdateAction(
Copy link
Copy Markdown
Member Author

@saranyailla saranyailla Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this returns true for the deployments that do not have the update policy set to DeploymentComponentUpdatePolicyAction.NOTIFY_COMPONENTS which is wrong. We should discard an update only when the policy is set to notify.

((DefaultDeploymentTask) currentDeploymentTaskMetadata.getDeploymentTask()).getDeployment()
.getGreengrassDeploymentId());
if (canCancelDeployment) {
DeploymentComponentUpdatePolicyAction currentDeploymentUpdatePolicyAction =
currentDeploymentTaskMetadata.getDeploymentDocument().getComponentUpdatePolicy()
.getComponentUpdatePolicyAction();
if (DeploymentComponentUpdatePolicyAction.NOTIFY_COMPONENTS.equals(currentDeploymentUpdatePolicyAction)
&& context.get(UpdateSystemPolicyService.class)
.discardPendingUpdateAction(currentDeploymentTaskMetadata.getGreengrassDeploymentId())) {
currentDeploymentTaskMetadata.getDeploymentResultFuture().cancel(true);
DeploymentType deploymentType = currentDeploymentTaskMetadata.getDeploymentType();
if (DeploymentType.SHADOW.equals(deploymentType) || DeploymentType.LOCAL.equals(deploymentType)) {
Expand All @@ -522,13 +525,11 @@ private void cancelCurrentDeployment() {
.kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME,
currentDeploymentTaskMetadata.getGreengrassDeploymentId())
.log("Deployment was cancelled");
} else {
logger.atInfo().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId())
.kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME,
currentDeploymentTaskMetadata.getGreengrassDeploymentId())
.log("Deployment is in a stage where it cannot be cancelled,"
+ " need to wait for it to finish");
return;
}
logger.atInfo().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId())
.kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getGreengrassDeploymentId())
.log("Deployment is in a stage where it cannot be cancelled, need to wait for it to finish");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class UpdateSystemPolicyService extends GreengrassService {
// represents the value in seconds the kernel will wait for components to respond to
// an precomponent update event
private final Map<String, UpdateAction> pendingActions = Collections.synchronizedMap(new LinkedHashMap<>());
private final AtomicReference<String> actionInProgress = new AtomicReference<>();

@Inject
private LifecycleIPCEventStreamAgent lifecycleIPCAgent;
Expand Down Expand Up @@ -96,20 +95,21 @@ public Set<String> getPendingActions() {
@SuppressWarnings("PMD.AvoidCatchingThrowable")
protected void runUpdateActions(String deploymentId) {
try (LockScope ls = LockScope.lock(lock)) {
for (Map.Entry<String, UpdateAction> todo : pendingActions.entrySet()) {
try {
actionInProgress.set(todo.getKey());
todo.getValue().getAction().run();
logger.atDebug().setEventType("service-update-action").addKeyValue("action", todo.getKey()).log();
} catch (Throwable t) {
logger.atError().setEventType("service-update-action-error").addKeyValue("action", todo.getKey())
.setCause(t).log();
}
final UpdateAction pendingUpdateAction = pendingActions.remove(deploymentId);
Copy link
Copy Markdown
Member Author

@saranyailla saranyailla Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: Pending actions that only correspond to the specified deployment id should be performed.
Fix and refactor: Rely on pendingActions map to check if the update action is in progress/completed or never added.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that a deferred deployment cannot be cancelled? Ideally, if a component is deferring a config merge due a deployment, this deployment should still be cancellable given that defer period is still ongoing.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Deferred deployments are still cancellable. That code path is not modified.
  • runUpdateActions is run only after we waited enough for the deferred time.
  • While we're waiting for the deferred time, deployment pending actions can be discarded upon request.

if (pendingUpdateAction == null) {
// Update action is never added or discarded by this time. So, do nothing.
return;
}
try {
pendingUpdateAction.getAction().run();
logger.atDebug().setEventType("service-update-action").addKeyValue("action", deploymentId).log();
} catch (Throwable t) {
logger.atError().setEventType("service-update-action-error").addKeyValue("action", deploymentId)
.setCause(t).log();
}
pendingActions.clear();
lifecycleIPCAgent.sendPostComponentUpdateEvent(
new PostComponentUpdateEvent().withDeploymentId(deploymentId));
actionInProgress.set(null);

}
}

Expand All @@ -121,16 +121,13 @@ protected void runUpdateActions(String deploymentId) {
* false if update actions were already in progress
*/
public boolean discardPendingUpdateAction(String tag) {
if (tag.equals(actionInProgress.get())) {
final UpdateAction pendingUpdateAction = pendingActions.remove(tag);
if (pendingUpdateAction == null) {
// Update action is never added or already in progress.
return false;
}
final UpdateAction pendingUpdateAction = pendingActions.get(tag);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix - race: A thread can reach this point before the update action is completed (but in progress) resulting in returning an incorrect value for discardPendingUpdateAction and sending two post update events (one via runUpdateActions and other via discardPendingUpdateAction). This is because 1/ runUpdateActionsremoves the action from the map only after it is completed 2/ No locking used while accessing/modifying the map .

Use remove to ensure that the updated state is reflected across threads.

if (pendingUpdateAction != null) {
// Signal components that they can resume their work since the update is not going to happen
lifecycleIPCAgent.sendPostComponentUpdateEvent(
new PostComponentUpdateEvent().withDeploymentId(pendingUpdateAction.getDeploymentId()));
pendingActions.remove(tag);
}
// Signal components that they can resume their work since the update is not going to happen
lifecycleIPCAgent.sendPostComponentUpdateEvent(new PostComponentUpdateEvent().withDeploymentId(tag));
return true;
}

Expand All @@ -150,18 +147,17 @@ protected void startup() throws InterruptedException {
logger.atDebug().setEventType("service-update-pending").addKeyValue("numOfUpdates", pendingActions.size())
.log();

boolean ggcRestarting = false;
for (UpdateAction action : pendingActions.values()) {
if (action.isGgcRestart()) {
ggcRestarting = true;
break;
}
final AtomicReference<UpdateAction> action = new AtomicReference<>();
pendingActions.values().stream().findFirst().ifPresent(action::set);
if (action.get() == null) {
// no pending actions
continue;
}
String deploymentId = action.get().getDeploymentId();
PreComponentUpdateEvent preComponentUpdateEvent = new PreComponentUpdateEvent()
.withDeploymentId(deploymentId)
.withIsGgcRestarting(action.get().isGgcRestart());

PreComponentUpdateEvent preComponentUpdateEvent = new PreComponentUpdateEvent();
preComponentUpdateEvent.setIsGgcRestarting(ggcRestarting);
String deploymentId = pendingActions.values().stream().map(UpdateAction::getDeploymentId).findFirst().get();
Copy link
Copy Markdown
Member Author

@saranyailla saranyailla Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: At this point, pendingActions can be empty and result in NPE. So, we send component updates only if there's an entry by invoking a consumer with ifPresent.

preComponentUpdateEvent.setDeploymentId(deploymentId);
List<Future<DeferComponentUpdateRequest>> deferRequestFutures =
lifecycleIPCAgent.sendPreComponentUpdateEvent(preComponentUpdateEvent);

Expand All @@ -179,8 +175,7 @@ protected void startup() throws InterruptedException {
logger.atInfo().setEventType("service-update-finish").log();
}).get();
} catch (ExecutionException e) {
logger.atError().setEventType("service-update-error")
.log("Run update actions errored", e);
logger.atError().setEventType("service-update-error").log("Run update actions errored", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,34 @@ void GIVEN_deployment_job_cancelled_WHEN_waiting_for_safe_time_THEN_then_cancel_

String deploymentDocument = getTestDeploymentDocument();

deploymentQueue.offer(new Deployment(deploymentDocument,
Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1));

when(mockExecutorService.submit(any(DefaultDeploymentTask.class))).thenReturn(mockFuture);
startDeploymentServiceInAnotherThread();

// Simulate a cancellation deployment
Thread.sleep(TEST_DEPLOYMENT_POLLING_FREQUENCY.toMillis()); // wait for previous deployment to be polled
deploymentQueue.offer(new Deployment(Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1, true));

// Expecting three invocations, once for each retry attempt
verify(mockExecutorService, WAIT_FOUR_SECONDS).submit(any(DefaultDeploymentTask.class));
verify(deploymentStatusKeeper, WAIT_FOUR_SECONDS).persistAndPublishDeploymentStatus(eq(TEST_JOB_ID_1),
eq(TEST_UUID), eq(TEST_CONFIGURATION_ARN), eq(Deployment.DeploymentType.IOT_JOBS),
eq(JobStatus.IN_PROGRESS.toString()), any(), eq(EXPECTED_ROOT_PACKAGE_LIST));
verify(updateSystemPolicyService, times(0)).discardPendingUpdateAction(TEST_DEPLOYMENT_ID);
verify(mockFuture, times(0)).cancel(true);
}

@Test
void GIVEN_deployment_job_with_notify_cancelled_WHEN_waiting_for_safe_time_THEN_then_cancel_deployment()
throws Exception {
Topics groupToLastDeploymentTopics = Topics.of(context, GROUP_TO_LAST_DEPLOYMENT_TOPICS, null);
when(config.lookupTopics(eq(GROUP_TO_LAST_DEPLOYMENT_TOPICS), anyString())).thenReturn(
groupToLastDeploymentTopics);

String deploymentDocument = getTestDeploymentDocumentNotify();

deploymentQueue.offer(new Deployment(deploymentDocument,
Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1));

Expand Down Expand Up @@ -645,6 +673,36 @@ void GIVEN_deployment_job_cancelled_WHEN_already_executing_update_THEN_then_fini

String deploymentDocument = getTestDeploymentDocument();

deploymentQueue.offer(new Deployment(deploymentDocument,
Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1));

when(mockExecutorService.submit(any(DefaultDeploymentTask.class))).thenReturn(mockFuture);
startDeploymentServiceInAnotherThread();

// Simulate a cancellation deployment
Thread.sleep(TEST_DEPLOYMENT_POLLING_FREQUENCY.toMillis()); // wait for previous deployment to be polled
deploymentQueue.offer(new Deployment(Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1, true));

// Expecting three invocations, once for each retry attempt
verify(mockExecutorService, WAIT_FOUR_SECONDS).submit(any(DefaultDeploymentTask.class));
// ComponentUpdatePolicy is set to SKIP_NOTIFY_COMPONENTS
verify(updateSystemPolicyService, times(0)).discardPendingUpdateAction(TEST_DEPLOYMENT_ID);
verify(mockFuture, times(0)).cancel(true);
verify(deploymentStatusKeeper, WAIT_FOUR_SECONDS).persistAndPublishDeploymentStatus(eq(TEST_JOB_ID_1),
eq(TEST_UUID), eq(TEST_CONFIGURATION_ARN), eq(Deployment.DeploymentType.IOT_JOBS),
eq(JobStatus.IN_PROGRESS.toString()), any(), eq(EXPECTED_ROOT_PACKAGE_LIST));
}


@Test
void GIVEN_deployment_job_with_notify_cancelled_WHEN_already_executing_update_THEN_then_finish_deployment()
throws Exception {
Topics groupToLastDeploymentTopics = Topics.of(context, GROUP_TO_LAST_DEPLOYMENT_TOPICS, null);
when(config.lookupTopics(eq(GROUP_TO_LAST_DEPLOYMENT_TOPICS), anyString())).thenReturn(
groupToLastDeploymentTopics);

String deploymentDocument = getTestDeploymentDocumentNotify();

deploymentQueue.offer(new Deployment(deploymentDocument,
Deployment.DeploymentType.IOT_JOBS, TEST_JOB_ID_1));

Expand Down Expand Up @@ -713,6 +771,13 @@ String getTestDeploymentDocument() {
.collect(Collectors.joining("\n")).replace(CONFIG_ARN_PLACEHOLDER, TEST_CONFIGURATION_ARN);
}

String getTestDeploymentDocumentNotify() {
return new BufferedReader(new InputStreamReader(
getClass().getResourceAsStream("TestDeploymentDocNotify.json"), StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n")).replace(CONFIG_ARN_PLACEHOLDER, TEST_CONFIGURATION_ARN);
}

private void assertListEquals(List<String> first, List<String> second) {
assertEquals(first.size(), second.size());
for (int i = 0; i < first.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"deploymentId": "testDeploymentId",
"configurationArn": "arn:aws:greengrass:us-east-1:12345678910:configuration:thinggroup/group1:1",
"components": {
"component1": {
"rootComponent": true,
"version": "1.0.0"
}
},
"componentUpdatePolicy": {
"timeout": 60,
"action": "NOTIFY_COMPONENTS"
},
"configurationValidationPolicy": {
"timeout": 20
}
}