Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -60,6 +61,7 @@ public class DeploymentConfigMerger {
public static final String MERGE_ERROR_LOG_EVENT_KEY = "config-update-error";
public static final String DEPLOYMENT_ID_LOG_KEY = "deploymentId";
public static final String SERVICE_NAME_LOG_KEY = "serviceName";
public static final String SOURCE_IOT_DATA_ENDPOINT_KEY = "sourceIotDataEndpoint";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the data endpoint is a device configuration and not related to the deployment configuration merger. Please move this to the device configuration class.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need sourceIotCredEndpoint?

protected static final int WAIT_SVC_START_POLL_INTERVAL_MILLISEC = 1000;

private static final Logger logger = LogManager.getLogger(DeploymentConfigMerger.class);
Expand Down Expand Up @@ -154,6 +156,22 @@ private void updateActionForDeployment(Map<String, Object> newConfig, Deployment
return;
}

// Persist source endpoint before activation for endpoint-switch deployments.
// Stored under services.DeploymentService.runtime (internal deployment state, not customer config).
// Persisted to config.tlog so it survives device crashes mid-endpoint-switch.
// Step 5 uses this value to report deployment status back to the source account.
// Cleared after status reporting; stale keys are harmless and overwritten by the next switch.
if (isEndpointSwitchDeployment(nucleusConfig, deviceConfiguration)) {
String currentDataEndpoint = Coerce.toString(deviceConfiguration.getIotDataEndpoint());
String newDataEndpoint = Coerce.toString(nucleusConfig.get(DEVICE_PARAM_IOT_DATA_ENDPOINT));
logger.atInfo().setEventType(MERGE_CONFIG_EVENT_KEY)
.kv("currentIotDataEndpoint", currentDataEndpoint)
.kv("newIotDataEndpoint", newDataEndpoint)
.log("Endpoint switch deployment detected, persisting source endpoint for rollback");
kernel.getContext().get(DeploymentService.class).getRuntimeConfig()
.lookup(SOURCE_IOT_DATA_ENDPOINT_KEY).withValue(currentDataEndpoint);
}

logger.atInfo(MERGE_CONFIG_EVENT_KEY).kv("deployment", deploymentId)
.log("Applying deployment changes");
activator.activate(newConfig, deployment, configMergeTimestamp, totallyCompleteFuture);
Expand Down Expand Up @@ -253,6 +271,22 @@ private String tryGetIoTDataEndpointFromNewConfig(Map<String, Object> kernelConf
return iotDataEndpoint;
}

/**
* Detect whether a deployment changes the IoT data endpoint.
*
* @param nucleusConfig the incoming nucleus configuration map, may be null
* @param deviceConfiguration the current device configuration
* @return true if the deployment changes iotDataEndpoint
*/
static boolean isEndpointSwitchDeployment(Map<String, Object> nucleusConfig,
DeviceConfiguration deviceConfiguration) {
if (nucleusConfig == null || !nucleusConfig.containsKey(DEVICE_PARAM_IOT_DATA_ENDPOINT)) {
return false;
}
String current = Coerce.toString(deviceConfiguration.getIotDataEndpoint());
String incoming = Coerce.toString(nucleusConfig.get(DEVICE_PARAM_IOT_DATA_ENDPOINT));
return !Objects.equals(current, incoming);
}

@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@

package com.aws.greengrass.deployment.activator;

import com.aws.greengrass.config.Topic;
import com.aws.greengrass.deployment.DeploymentConfigMerger;
import com.aws.greengrass.deployment.DeploymentService;
import com.aws.greengrass.deployment.exceptions.ServiceUpdateException;
import com.aws.greengrass.deployment.model.Deployment;
import com.aws.greengrass.deployment.model.DeploymentDocument;
import com.aws.greengrass.deployment.model.DeploymentResult;
import com.aws.greengrass.lifecyclemanager.GreengrassService;
import com.aws.greengrass.lifecyclemanager.Kernel;
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
import com.aws.greengrass.util.Coerce;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -49,7 +52,18 @@ public void activate(Map<String, Object> newConfig, Deployment deployment, long
}

DeploymentDocument deploymentDocument = deployment.getDeploymentDocumentObj();
if (isAutoRollbackRequested(deploymentDocument) && !takeConfigSnapshot(totallyCompleteFuture)) {
boolean autoRollback = isAutoRollbackRequested(deploymentDocument);

// Endpoint switch deployments force config snapshot and rollback regardless of FailureHandlingPolicy,
// so that we can restore the original endpoint if the new one is unreachable after applying the change.
// Only check when auto-rollback is not already requested to avoid redundant config store reads.
boolean forceSnapshotForEndpointSwitch = !autoRollback && isEndpointSwitch();
if (forceSnapshotForEndpointSwitch) {
logger.atInfo(MERGE_CONFIG_EVENT_KEY)
.log("Endpoint switch deployment: forcing config snapshot and rollback support");
}
if ((autoRollback || forceSnapshotForEndpointSwitch)
&& !takeConfigSnapshot(totallyCompleteFuture)) {
return;
}

Expand Down Expand Up @@ -114,7 +128,9 @@ private void handleFailure(DeploymentConfigMerger.AggregateServicesChangeManager
Throwable failureCause) {
logger.atError(MERGE_CONFIG_EVENT_KEY).kv(DEPLOYMENT_ID_LOG_KEY, deploymentDocument.getDeploymentId())
.setCause(failureCause).log("Deployment failed");
if (isAutoRollbackRequested(deploymentDocument)) {
if (isAutoRollbackRequested(deploymentDocument) || isEndpointSwitch()) {
logger.atInfo(MERGE_CONFIG_EVENT_KEY).kv(DEPLOYMENT_ID_LOG_KEY, deploymentDocument.getDeploymentId())
.log("Initiating rollback for failed deployment");
rollback(deploymentDocument, totallyCompleteFuture, failureCause,
servicesChangeManager.createRollbackManager());
} else {
Expand Down Expand Up @@ -180,6 +196,17 @@ void rollback(DeploymentDocument deploymentDocument, CompletableFuture<Deploymen
}
}

/**
* Check if the current deployment is an endpoint switch by looking for persisted source endpoint
* in DeploymentService runtime config.
*/
private boolean isEndpointSwitch() {
Topic t = kernel.getContext().get(DeploymentService.class).getRuntimeConfig()
.find(DeploymentConfigMerger.SOURCE_IOT_DATA_ENDPOINT_KEY);
String source = t == null ? null : Coerce.toString(t);
return source != null && !source.isEmpty();
}

private void handleFailureRollback(CompletableFuture totallyCompleteFuture, Throwable deploymentFailureCause,
Throwable rollbackFailureCause) {
// Rollback execution failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import static software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction.NOTIFY_COMPONENTS;
import static software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction.SKIP_NOTIFY_COMPONENTS;


@SuppressWarnings({"PMD.CouplingBetweenObjects", "PMD.CloseResource"})
@ExtendWith({GGExtension.class, MockitoExtension.class})
class DeploymentConfigMergerTest {

Expand All @@ -93,12 +95,22 @@ class DeploymentConfigMergerTest {
@Mock
private ExecutorService executorService;
@Mock
private DeploymentDirectoryManager deploymentDirectoryManager;
@Mock
private DeploymentService deploymentService;
@Mock
private Topics runtimeTopics;
@Mock
private Context context;

@BeforeEach
void beforeEach() {
lenient().when(kernel.getContext()).thenReturn(context);
lenient().when(validator.validate(anyMap(), any(), any())).thenReturn(true);
lenient().when(context.get(DeploymentDirectoryManager.class)).thenReturn(deploymentDirectoryManager);
lenient().when(context.get(DeploymentService.class)).thenReturn(deploymentService);
lenient().when(deploymentService.getRuntimeConfig()).thenReturn(runtimeTopics);
lenient().when(runtimeTopics.lookup(any(String.class))).thenReturn(mock(Topic.class));
}

@AfterEach
Expand Down Expand Up @@ -590,4 +602,95 @@ private static <T> Set<T> newOrderedSet(T... objs) {
Collections.addAll(set, objs);
return set;
}

@Test
void GIVEN_data_endpoint_changed_WHEN_isEndpointSwitchDeployment_THEN_returns_true() {
Topic dataEndpointTopic = Topic.of(context, DEVICE_PARAM_IOT_DATA_ENDPOINT, "old-ats.iot.us-east-1.amazonaws.com");
when(deviceConfiguration.getIotDataEndpoint()).thenReturn(dataEndpointTopic);

Map<String, Object> nucleusConfig = new HashMap<>();
nucleusConfig.put(DEVICE_PARAM_IOT_DATA_ENDPOINT, "new-ats.iot.us-west-2.amazonaws.com");

assertTrue(DeploymentConfigMerger.isEndpointSwitchDeployment(nucleusConfig, deviceConfiguration));
}

@Test
void GIVEN_cred_endpoint_only_changed_WHEN_isEndpointSwitchDeployment_THEN_returns_false() {
Map<String, Object> nucleusConfig = new HashMap<>();
nucleusConfig.put(DEVICE_PARAM_IOT_CRED_ENDPOINT, "new.credentials.iot.us-west-2.amazonaws.com");

assertFalse(DeploymentConfigMerger.isEndpointSwitchDeployment(nucleusConfig, deviceConfiguration));
}

@Test
void GIVEN_data_endpoint_unchanged_WHEN_isEndpointSwitchDeployment_THEN_returns_false() {
String dataEndpoint = "same-ats.iot.us-east-1.amazonaws.com";
Topic dataEndpointTopic = Topic.of(context, DEVICE_PARAM_IOT_DATA_ENDPOINT, dataEndpoint);
when(deviceConfiguration.getIotDataEndpoint()).thenReturn(dataEndpointTopic);

Map<String, Object> nucleusConfig = new HashMap<>();
nucleusConfig.put(DEVICE_PARAM_IOT_DATA_ENDPOINT, dataEndpoint);

assertFalse(DeploymentConfigMerger.isEndpointSwitchDeployment(nucleusConfig, deviceConfiguration));
}

@Test
void GIVEN_no_endpoint_keys_WHEN_isEndpointSwitchDeployment_THEN_returns_false() {
Map<String, Object> nucleusConfig = new HashMap<>();
nucleusConfig.put(DEVICE_PARAM_AWS_REGION, "us-east-1");

assertFalse(DeploymentConfigMerger.isEndpointSwitchDeployment(nucleusConfig, deviceConfiguration));
}

@Test
void GIVEN_null_config_WHEN_isEndpointSwitchDeployment_THEN_returns_false() {
assertFalse(DeploymentConfigMerger.isEndpointSwitchDeployment(null, deviceConfiguration));
}

@Test
void GIVEN_endpoint_switch_deployment_WHEN_updateAction_THEN_source_endpoints_persisted_before_activate()
throws Throwable {
DeploymentActivatorFactory deploymentActivatorFactory = mock(DeploymentActivatorFactory.class);
DeploymentActivator deploymentActivator = mock(DeploymentActivator.class);
when(deploymentActivatorFactory.getDeploymentActivator(any())).thenReturn(deploymentActivator);
when(context.get(DeploymentActivatorFactory.class)).thenReturn(deploymentActivatorFactory);

Topic dataEndpointTopic = Topic.of(context, DEVICE_PARAM_IOT_DATA_ENDPOINT,
"old-ats.iot.us-east-1.amazonaws.com");
Topic credEndpointTopic = Topic.of(context, DEVICE_PARAM_IOT_CRED_ENDPOINT,
"old.credentials.iot.us-east-1.amazonaws.com");
when(deviceConfiguration.getIotDataEndpoint()).thenReturn(dataEndpointTopic);
when(deviceConfiguration.getIotCredentialEndpoint()).thenReturn(credEndpointTopic);
when(deviceConfiguration.getNucleusComponentName()).thenReturn(DEFAULT_NUCLEUS_COMPONENT_NAME);

Map<String, Object> nucleusConfigMap = new HashMap<>();
nucleusConfigMap.put(DEVICE_PARAM_IOT_DATA_ENDPOINT, "new-ats.iot.us-west-2.amazonaws.com");
Map<String, Object> nucleusNamespace = new HashMap<>();
nucleusNamespace.put(CONFIGURATION_CONFIG_KEY, nucleusConfigMap);
Map<String, Object> serviceConfig = new HashMap<>();
serviceConfig.put(DEFAULT_NUCLEUS_COMPONENT_NAME, nucleusNamespace);
Map<String, Object> newConfig = new HashMap<>();
newConfig.put(SERVICES_NAMESPACE_TOPIC, serviceConfig);

Topic sourceEndpointTopic = mock(Topic.class);
when(runtimeTopics.lookup(DeploymentConfigMerger.SOURCE_IOT_DATA_ENDPOINT_KEY))
.thenReturn(sourceEndpointTopic);

DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator,
executorService);
DeploymentDocument doc = mock(DeploymentDocument.class);
lenient().when(doc.getDeploymentId()).thenReturn("DeploymentId");
when(doc.getComponentUpdatePolicy()).thenReturn(new ComponentUpdatePolicy(0, SKIP_NOTIFY_COMPONENTS));

merger.mergeInNewConfig(createMockDeployment(doc), newConfig, System.currentTimeMillis());

// executorService.execute() is called for SKIP_NOTIFY_COMPONENTS
ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(executorService).execute(runnableCaptor.capture());
runnableCaptor.getValue().run();

verify(sourceEndpointTopic).withValue("old-ats.iot.us-east-1.amazonaws.com");
verify(deploymentActivator).activate(any(), any(), any(Long.class), any());
}

}
Loading
Loading