Skip to content

Commit 7927ac4

Browse files
Matt Byrdbeobal
authored andcommitted
Bring back the ability to Optionally avoid hint transfer during decommission (CASSANDRA-17808)
lost due to merge of ae08423 patch by Matt Byrd; reviewed by Stefan Miklosovic and Sam Tunnicliffe for CASSANDRA-21341
1 parent eabcce9 commit 7927ac4

3 files changed

Lines changed: 29 additions & 12 deletions

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
6.0-alpha2
2+
* Restore option to avoid hint transfer during decommission (CASSANDRA-21341)
23
* Add an offline cluster metadata tool (CASSANDRA-19151)
34
* Accord: Tail Latency Improvements (CASSANDRA-21361)
45
* Artificial Latency Injection (CASSANDRA-17024)

src/java/org/apache/cassandra/tcm/sequences/UnbootstrapStreams.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import org.apache.cassandra.batchlog.BatchlogManager;
33+
import org.apache.cassandra.config.DatabaseDescriptor;
3334
import org.apache.cassandra.db.SystemKeyspace;
3435
import org.apache.cassandra.dht.Range;
3536
import org.apache.cassandra.dht.Token;
37+
import org.apache.cassandra.hints.HintsService;
3638
import org.apache.cassandra.locator.EndpointsByReplica;
3739
import org.apache.cassandra.locator.InetAddressAndPort;
3840
import org.apache.cassandra.locator.RangesAtEndpoint;
@@ -50,6 +52,7 @@
5052
import org.apache.cassandra.tcm.ownership.MovementMap;
5153
import org.apache.cassandra.tcm.ownership.PlacementDeltas;
5254
import org.apache.cassandra.utils.concurrent.Future;
55+
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
5356

5457
public class UnbootstrapStreams implements LeaveStreams
5558
{
@@ -125,9 +128,20 @@ private static void unbootstrap(Keyspaces keyspaces, MovementMap movements) thro
125128
logger.debug("waiting for batch log processing.");
126129
batchlogReplay.get();
127130

128-
logger.info("streaming hints to other nodes");
131+
Future<?> hintsSuccess = ImmediateFuture.success(null);
129132

130-
Future<?> hintsSuccess = StorageService.instance.streamHints();
133+
if (DatabaseDescriptor.getTransferHintsOnDecommission())
134+
{
135+
logger.info("streaming hints to other nodes");
136+
hintsSuccess = StorageService.instance.streamHints();
137+
}
138+
else
139+
{
140+
logger.info("pausing dispatch and deleting hints");
141+
DatabaseDescriptor.setHintedHandoffEnabled(false);
142+
HintsService.instance.pauseDispatch();
143+
HintsService.instance.deleteAllHints();
144+
}
131145

132146
// wait for the transfer runnables to signal the latch.
133147
logger.debug("waiting for stream acks.");

test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,25 +71,27 @@ public void shouldAvoidHintTransferOnDecommission() throws Exception
7171
{
7272
cluster.schemaChange(withKeyspace("CREATE TABLE %s.decom_no_hints_test (key int PRIMARY KEY, value int)"));
7373

74-
cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), ALL, 0, 0);
75-
long hintsBeforeShutdown = countTotalHints(cluster.get(1));
74+
int secondNode = 2;
75+
cluster.coordinator(secondNode).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), ALL, 0, 0);
76+
long hintsBeforeShutdown = countTotalHints(cluster.get(secondNode));
7677
assertThat(hintsBeforeShutdown).isEqualTo(0);
77-
long hintsDelivered = countHintsDelivered(cluster.get(1));
78+
long hintsDelivered = countHintsDelivered(cluster.get(secondNode));
7879
assertThat(hintsDelivered).isEqualTo(0);
7980

8081
// Shutdown node 3 so hints can be written against it.
8182
cluster.get(3).shutdown().get();
8283

83-
cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), TWO, 0, 0);
84-
Awaitility.await().until(() -> countTotalHints(cluster.get(1)) > 0);
85-
long hintsAfterShutdown = countTotalHints(cluster.get(1));
84+
cluster.coordinator(secondNode).execute(withKeyspace("INSERT INTO %s.decom_no_hints_test (key, value) VALUES (?, ?)"), TWO, 0, 0);
85+
Awaitility.await().until(() -> countTotalHints(cluster.get(secondNode)) > 0);
86+
long hintsAfterShutdown = countTotalHints(cluster.get(secondNode));
8687
assertThat(hintsAfterShutdown).isEqualTo(1);
8788

88-
cluster.get(2).runOnInstance(() -> setProgressBarrierMinConsistencyLevel(org.apache.cassandra.db.ConsistencyLevel.ONE));
89+
cluster.get(secondNode).runOnInstance(() -> setProgressBarrierMinConsistencyLevel(org.apache.cassandra.db.ConsistencyLevel.ONE));
90+
8991
ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1), 3);
90-
cluster.get(2).nodetoolResult("decommission", "--force").asserts().success();
91-
long hintsDeliveredByDecom = countHintsDelivered(cluster.get(2));
92-
String mode = cluster.get(2).callOnInstance(() -> StorageService.instance.getOperationMode());
92+
cluster.get(secondNode).nodetoolResult("decommission", "--force").asserts().success();
93+
long hintsDeliveredByDecom = countHintsDelivered(cluster.get(secondNode));
94+
String mode = cluster.get(secondNode).callOnInstance(() -> StorageService.instance.getOperationMode());
9395
assertEquals(StorageService.Mode.DECOMMISSIONED.toString(), mode);
9496
assertThat(hintsDeliveredByDecom).isEqualTo(0);
9597
}

0 commit comments

Comments
 (0)