4949
5050import org .apache .cassandra .concurrent .ScheduledExecutorPlus ;
5151import org .apache .cassandra .config .DatabaseDescriptor ;
52+ import org .apache .cassandra .config .DurationSpec ;
53+ import org .apache .cassandra .config .MutationTrackingSpec ;
5254import org .apache .cassandra .cql3 .UntypedResultSet ;
5355import org .apache .cassandra .db .ConsistencyLevel ;
5456import org .apache .cassandra .db .DecoratedKey ;
8789import org .apache .cassandra .tcm .ownership .ReplicaGroups ;
8890import org .apache .cassandra .tcm .ownership .VersionedEndpoints ;
8991import org .apache .cassandra .utils .FBUtilities ;
92+ import org .apache .cassandra .utils .MBeanWrapper ;
9093
9194import static com .google .common .base .Preconditions .checkNotNull ;
9295import static java .lang .String .format ;
9699
97100// TODO (expected): persistence (handle restarts)
98101// TODO (expected): handle topology changes
99- public class MutationTrackingService
102+ public class MutationTrackingService implements MutationTrackingServiceMBean
100103{
104+ public static final String MBEAN_NAME = "org.apache.cassandra.db:type=MutationTrackingService" ;
101105 public static final String DISABLED_MESSAGE = "Mutation tracking is not enabled. (See mutation_tracking.enabled in cassandra.yaml)" ;
102106
103107 private static final MutationTrackingService instance ;
104108 private static final ScheduledExecutorPlus executor ;
105109
110+ private static final MutationTrackingSpec config ;
111+
106112 static
107113 {
108- if (DatabaseDescriptor .getMutationTrackingEnabled ())
114+ config = DatabaseDescriptor .getMutationTrackingConfig ();
115+
116+ if (config .enabled )
109117 {
110118 instance = new MutationTrackingService ();
111119 executor = executorFactory ().scheduled ("Mutation-Tracking-Service" , NORMAL );
120+ MBeanWrapper .instance .registerMBean (instance , MBEAN_NAME );
112121 }
113122 else
114123 {
@@ -129,12 +138,12 @@ public static MutationTrackingService instance()
129138
130139 public static boolean isEnabled ()
131140 {
132- return DatabaseDescriptor . getMutationTrackingEnabled () ;
141+ return config . enabled ;
133142 }
134143
135144 public static void ensureEnabled ()
136145 {
137- if (!DatabaseDescriptor . getMutationTrackingEnabled () )
146+ if (!config . enabled )
138147 throw new IllegalStateException (DISABLED_MESSAGE );
139148 }
140149
@@ -188,6 +197,7 @@ public static void shutdown() throws InterruptedException
188197 private final ReplicatedOffsetsBroadcaster offsetsBroadcaster = new ReplicatedOffsetsBroadcaster ();
189198 private final LogStatePersister offsetsPersister = new LogStatePersister ();
190199 private final ActiveLogReconciler activeReconciler = new ActiveLogReconciler ();
200+ private final BackgroundReconciler backgroundReconciler = new BackgroundReconciler ();
191201
192202 private final IncomingMutations incomingMutations = new IncomingMutations ();
193203 private final OutgoingMutations outgoingMutations = new OutgoingMutations ();
@@ -223,14 +233,53 @@ private synchronized void startInternal(ClusterMetadata metadata)
223233
224234 onNewClusterMetadata (null , metadata );
225235
236+ if (!keyspaceShards .isEmpty () && !config .background_reconciliation_enabled )
237+ logBackgroundReconciliationDisabledWarning (keyspaceShards .keySet ());
238+
226239 offsetsBroadcaster .start ();
227240 offsetsPersister .start ();
241+ backgroundReconciler .start ();
228242
229243 ExpiredStatePurger .instance .register (incomingMutations );
230244
231245 started = true ;
232246 }
233247
248+ @ Override
249+ public void setMutationTrackingBackgroundReconciliationEnabled (boolean enabled )
250+ {
251+ if (enabled != config .background_reconciliation_enabled )
252+ {
253+ logger .info ("{} mutation tracking background reconciliation" , enabled ? "Enabling" : "Disabling" );
254+ config .background_reconciliation_enabled = enabled ;
255+ }
256+ }
257+
258+ @ Override
259+ public boolean getMutationTrackingBackgroundReconciliationEnabled ()
260+ {
261+ return config .background_reconciliation_enabled ;
262+ }
263+
264+ @ Override
265+ public void setMutationTrackingBackgroundReconciliationIntervalMilliseconds (long intervalMilliseconds )
266+ {
267+ if (intervalMilliseconds != config .background_reconciliation_interval .toMilliseconds ())
268+ {
269+ DurationSpec .LongMillisecondsBound backgroundReconciliationInterval =
270+ new DurationSpec .LongMillisecondsBound (intervalMilliseconds , TimeUnit .MILLISECONDS );
271+ logger .info ("Setting mutation tracking background reconciliation interval from {} to {}" ,
272+ config .background_reconciliation_interval , backgroundReconciliationInterval );
273+ config .background_reconciliation_interval = backgroundReconciliationInterval ;
274+ }
275+ }
276+
277+ @ Override
278+ public long getMutationTrackingBackgroundReconciliationIntervalMilliseconds ()
279+ {
280+ return config .background_reconciliation_interval .toMilliseconds ();
281+ }
282+
234283 public void pauseOffsetBroadcast (boolean pause )
235284 {
236285 offsetsBroadcaster .pauseOffsetBroadcast (pause );
@@ -862,6 +911,14 @@ private void onNewClusterMetadata(@Nullable ClusterMetadata prev, ClusterMetadat
862911 // recalculating the shards will repopulate this via the existing callbacks
863912 log2ShardMap = new ConcurrentHashMap <>();
864913 keyspaceShards = applyUpdatedMetadata (keyspaceShards , prev , next , this ::nextLogId , this ::onNewLog );
914+
915+ if (!config .background_reconciliation_enabled )
916+ {
917+ Set <String > newKeyspaces = new HashSet <>(keyspaceShards .keySet ());
918+ newKeyspaces .removeAll (originalKeyspaceShards .keySet ());
919+ if (!newKeyspaces .isEmpty ())
920+ logBackgroundReconciliationDisabledWarning (newKeyspaces );
921+ }
865922 }
866923 catch (Throwable t )
867924 {
@@ -1047,6 +1104,12 @@ private static List<SyncTask> unwrapped(Collection<SyncTask> tasks)
10471104 return unwrapped ;
10481105 }
10491106
1107+ private void logBackgroundReconciliationDisabledWarning (Set <String > keyspaces )
1108+ {
1109+ logger .warn ("Background reconciliation is disabled but mutation tracking keyspaces exist: {}. " +
1110+ "Unreconciled mutations will not be automatically repaired in the background." , keyspaces );
1111+ }
1112+
10501113 public static class KeyspaceShards
10511114 {
10521115 private enum UpdateDecision
@@ -1374,6 +1437,93 @@ static int loadHostLogIdFromSystemTable()
13741437 return rows .one ().getInt ("host_log_id" );
13751438 }
13761439
1440+ private static class BackgroundReconciler
1441+ {
1442+ void start ()
1443+ {
1444+ scheduleNext ();
1445+ }
1446+
1447+ private void scheduleNext ()
1448+ {
1449+ long intervalMillis = config .background_reconciliation_interval .toMilliseconds ();
1450+ executor .schedule (this ::runAndReschedule , intervalMillis , TimeUnit .MILLISECONDS );
1451+ }
1452+
1453+ private void runAndReschedule ()
1454+ {
1455+ try
1456+ {
1457+ run ();
1458+ }
1459+ finally
1460+ {
1461+ scheduleNext ();
1462+ }
1463+ }
1464+
1465+ void run ()
1466+ {
1467+ MutationTrackingService .instance ().forEachKeyspace (this ::run );
1468+ }
1469+
1470+ private void run (KeyspaceShards shards )
1471+ {
1472+ if (config .background_reconciliation_enabled )
1473+ shards .forEachShard (this ::run );
1474+ }
1475+
1476+ private void run (Shard shard )
1477+ {
1478+ try
1479+ {
1480+ List <Offsets .Immutable > missing = shard .collectLocallyMissingOffsets ();
1481+ if (missing .isEmpty ()) return ;
1482+
1483+ for (Offsets .Immutable offsets : missing )
1484+ {
1485+ // Prefer pulling from the coordinator
1486+ int coordinatorHostId = offsets .logId ().hostId ();
1487+ InetAddressAndPort coordinator = ClusterMetadata .current ().directory .endpoint (new NodeId (coordinatorHostId ));
1488+ InetAddressAndPort pullFrom = FailureDetector .instance .isAlive (coordinator )
1489+ ? coordinator
1490+ : findAliveReplica (shard , coordinatorHostId );
1491+ if (pullFrom == null )
1492+ {
1493+ logger .debug ("No coordinator or replica is available to process the pull mutation request for missing offset {}" ,
1494+ offsets );
1495+ continue ; // No reachable source
1496+ }
1497+
1498+ // TODO (expected): backoff, rate limits, per host and total
1499+ PullMutationsRequest request = new PullMutationsRequest (offsets );
1500+ logger .trace ("Requesting pull mutation request from replica {} for missing offset {}" , pullFrom , offsets );
1501+ MessagingService .instance ().send (Message .out (Verb .PULL_MUTATIONS_REQ , request ), pullFrom );
1502+ }
1503+ }
1504+ catch (Throwable throwable )
1505+ {
1506+ // Avoid throwing an exception in the reconciliation step to prevent the scheduled task from
1507+ // being killed
1508+ logger .error ("Exception encountered during background reconciliation of shard={}" , shard , throwable );
1509+ }
1510+ }
1511+
1512+ private InetAddressAndPort findAliveReplica (Shard shard , int excludeHostId )
1513+ {
1514+ for (InetAddressAndPort replica : shard .remoteReplicas ())
1515+ {
1516+ int replicaId = ClusterMetadata .current ().directory .peerId (replica ).id ();
1517+ if (replicaId != excludeHostId && FailureDetector .instance .isAlive (replica ))
1518+ {
1519+ logger .trace ("Found alive replica {} with replica id {}" , replica , replicaId );
1520+ return replica ;
1521+ }
1522+ }
1523+ return null ;
1524+ }
1525+ }
1526+
13771527 // TODO (later): a more intelligent heuristic for offsets included in broadcasts
13781528 private static class ReplicatedOffsetsBroadcaster
13791529 {
@@ -1509,6 +1659,24 @@ public void resumeActiveReconcilerRegularPriority()
15091659 activeReconciler .resumeRegularPriorityForTesting ();
15101660 }
15111661
1662+ @ VisibleForTesting
1663+ public void reconcileForTesting ()
1664+ {
1665+ backgroundReconciler .run ();
1666+ }
1667+
1668+ @ VisibleForTesting
1669+ public void pauseBackgroundReconciler ()
1670+ {
1671+ config .background_reconciliation_enabled = false ;
1672+ }
1673+
1674+ @ VisibleForTesting
1675+ public void resumeBackgroundReconciler ()
1676+ {
1677+ config .background_reconciliation_enabled = true ;
1678+ }
1679+
15121680 @ VisibleForTesting
15131681 public static class TestAccess
15141682 {
0 commit comments