Skip to content

Commit 2bb26e7

Browse files
jcleezerJason Leezer
andauthored
Clear DarkCluster Buffer on Refresh (#1132)
* Clear DarkCluster Buffers On Refresh * Improvements * Simplify * Additional unit test, remove logs, add back comment * Fix test * Bump version and update CHANGELOG --------- Co-authored-by: Jason Leezer <jleezer@linkedin.com>
1 parent 922f8b3 commit 2bb26e7

File tree

7 files changed

+161
-5
lines changed

7 files changed

+161
-5
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ and what APIs have changed, if applicable.
1414

1515
## [Unreleased]
1616

17+
## [29.81.2] - 2026-01-09
18+
- Clear DarkCluster Buffer on Refresh
19+
1720
## [29.81.1] - 2025-11-04
1821
- Fix xds client active wait time metric also add more logs.
1922

@@ -5933,7 +5936,8 @@ patch operations can re-use these classes for generating patch messages.
59335936

59345937
## [0.14.1]
59355938

5936-
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.81.1...master
5939+
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.81.2...master
5940+
[29.81.2]: https://github.com/linkedin/rest.li/compare/v29.81.1...v29.81.2
59375941
[29.81.1]: https://github.com/linkedin/rest.li/compare/v29.81.0...v29.81.1
59385942
[29.81.0]: https://github.com/linkedin/rest.li/compare/v29.80.3...v29.81.0
59395943
[29.80.3]: https://github.com/linkedin/rest.li/compare/v29.80.2...v29.80.3

darkcluster/src/main/java/com/linkedin/darkcluster/api/DarkClusterStrategy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,9 @@ public interface DarkClusterStrategy
3333
* @return true if at least one request was sent.
3434
*/
3535
boolean handleRequest(final RestRequest originalRequest, final RestRequest darkRequest, final RequestContext requestContext);
36+
37+
default void shutdown()
38+
{
39+
// no-op by default
40+
}
3641
}

darkcluster/src/main/java/com/linkedin/darkcluster/impl/ConstantQpsDarkClusterStrategy.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ public boolean handleRequest(RestRequest originalRequest, RestRequest darkReques
8181
return addRequest(originalRequest, darkRequest, requestContext);
8282
}
8383

84+
@Override
85+
public void shutdown()
86+
{
87+
_rateLimiter.cancelAll(new RuntimeException("Shutting down ConstantQpsDarkClusterStrategy"));
88+
}
89+
8490
/**
8591
* We won't create this strategy if this config isn't valid for this strategy. For instance, we don't want to create
8692
* the ConstantQpsDarkClusterStrategy if any of the configurables are zero, because we'd be doing pointless work on every getOrCreate.
@@ -155,6 +161,7 @@ private float getSendRate()
155161
*/
156162
private boolean addRequest(RestRequest originalRequest, RestRequest darkRequest, RequestContext requestContext)
157163
{
164+
final BaseDarkClusterDispatcher baseDispatcher = _baseDarkClusterDispatcher;
158165
_rateLimiter.submit(new Callback<None>()
159166
{
160167
@Override
@@ -166,7 +173,7 @@ public void onError(Throwable e)
166173
@Override
167174
public void onSuccess(None result)
168175
{
169-
_baseDarkClusterDispatcher.sendRequest(originalRequest, darkRequest, requestContext, NUM_REQUESTS_TO_SEND_PER_RATE_LIMITER_CYCLE);
176+
baseDispatcher.sendRequest(originalRequest, darkRequest, requestContext, NUM_REQUESTS_TO_SEND_PER_RATE_LIMITER_CYCLE);
170177
}
171178
});
172179
return true;

darkcluster/src/main/java/com/linkedin/darkcluster/impl/DarkClusterStrategyFactoryImpl.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,14 +278,30 @@ public void onSuccess(DarkClusterConfigMap updatedDarkConfigMap)
278278

279279
for (int partitionId : partitions)
280280
{
281+
281282
Map<String, DarkClusterStrategy> darkStrategyMap = new ConcurrentHashMap<>();
282283
for (Map.Entry<String, DarkClusterConfig> entry : updatedDarkConfigMap.entrySet())
283284
{
284285
String darkClusterToAdd = entry.getKey();
285286
darkStrategyMap.put(darkClusterToAdd, createStrategy(darkClusterToAdd, entry.getValue(), partitionId));
286-
LOG.info("Created new strategy for dark cluster: " + darkClusterToAdd + ", partition: " + partitionId + ", source cluster: " + _sourceClusterName);
287+
LOG.info("Created new strategy for dark cluster: " + darkClusterToAdd + ", partition: " + partitionId
288+
+ ", source cluster: " + _sourceClusterName);
287289
}
288-
_partitionToDarkStrategyMap.put(partitionId, darkStrategyMap);
290+
_partitionToDarkStrategyMap.compute(partitionId, (key, oldDarkStrategyMap) -> {
291+
// Shutdown any old strategies that are being replaced.
292+
if (oldDarkStrategyMap != null)
293+
{
294+
for (Map.Entry<String, DarkClusterStrategy> oldEntry : oldDarkStrategyMap.entrySet())
295+
{
296+
final DarkClusterStrategy oldStrategy = oldEntry.getValue();
297+
if (oldStrategy != null)
298+
{
299+
oldStrategy.shutdown();
300+
}
301+
}
302+
}
303+
return darkStrategyMap;
304+
});
289305
}
290306
}
291307
});
@@ -300,6 +316,20 @@ public void onClusterRemoved(String clusterName)
300316
{
301317
if (_sourceClusterName.equals(clusterName))
302318
{
319+
for (Map.Entry<Integer, Map<String, DarkClusterStrategy>> entry : _partitionToDarkStrategyMap.entrySet())
320+
{
321+
Map<String, DarkClusterStrategy> strategies = entry.getValue();
322+
if (strategies != null)
323+
{
324+
for (DarkClusterStrategy strategy : strategies.values())
325+
{
326+
if (strategy != null)
327+
{
328+
strategy.shutdown();
329+
}
330+
}
331+
}
332+
}
303333
_partitionToDarkStrategyMap.clear();
304334
_sourceClusterPresent = false;
305335
}

darkcluster/src/test/java/com/linkedin/darkcluster/TestConstantQpsDarkClusterStrategy.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,14 @@
3535
import java.util.List;
3636
import java.util.function.Supplier;
3737
import java.util.stream.IntStream;
38+
import org.mockito.ArgumentCaptor;
39+
import org.mockito.Mockito;
3840
import org.testng.Assert;
3941
import org.testng.annotations.DataProvider;
4042
import org.testng.annotations.Test;
4143

44+
import static org.mockito.Mockito.verify;
45+
4246
public class TestConstantQpsDarkClusterStrategy
4347
{
4448
private static final String SOURCE_CLUSTER_NAME = "FooCluster";
@@ -176,4 +180,35 @@ static EvictingCircularBuffer getBuffer(Clock clock)
176180
{
177181
return new EvictingCircularBuffer(TEST_CAPACITY, TEST_TTL, TEST_TTL_UNIT, clock);
178182
}
183+
184+
@Test
185+
public void testShutdown()
186+
{
187+
// Setup mocks
188+
ConstantQpsRateLimiter mockRateLimiter = Mockito.mock(ConstantQpsRateLimiter.class);
189+
BaseDarkClusterDispatcherImpl mockDispatcher = Mockito.mock(BaseDarkClusterDispatcherImpl.class);
190+
MockClusterInfoProvider mockClusterInfoProvider = new MockClusterInfoProvider();
191+
mockClusterInfoProvider.putHttpsClusterCount(SOURCE_CLUSTER_NAME, 10);
192+
mockClusterInfoProvider.putHttpsClusterCount(DARK_CLUSTER_NAME_ONE, 5);
193+
194+
// Create strategy instance
195+
ConstantQpsDarkClusterStrategy strategy = new ConstantQpsDarkClusterStrategy(
196+
SOURCE_CLUSTER_NAME,
197+
DARK_CLUSTER_NAME_ONE,
198+
50.0f,
199+
mockDispatcher,
200+
new DoNothingNotifier(),
201+
mockClusterInfoProvider,
202+
mockRateLimiter);
203+
204+
// Call shutdown
205+
strategy.shutdown();
206+
207+
// Verify that cancelAll was called with a RuntimeException containing the expected message.
208+
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
209+
verify(mockRateLimiter).cancelAll(throwableCaptor.capture());
210+
Throwable throwable = throwableCaptor.getValue();
211+
Assert.assertTrue(throwable instanceof RuntimeException);
212+
Assert.assertEquals(throwable.getMessage(), "Shutting down ConstantQpsDarkClusterStrategy");
213+
}
179214
}

darkcluster/src/test/java/com/linkedin/darkcluster/TestDarkClusterStrategyFactory.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.Executors;
2727
import java.util.concurrent.ScheduledExecutorService;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicInteger;
2930

3031
import com.linkedin.d2.DarkClusterConfig;
3132
import com.linkedin.d2.DarkClusterStrategyNameArray;
@@ -36,6 +37,7 @@
3637
import com.linkedin.darkcluster.api.DarkClusterStrategy;
3738
import com.linkedin.darkcluster.api.DarkClusterStrategyFactory;
3839
import com.linkedin.darkcluster.api.NoOpDarkClusterStrategy;
40+
import com.linkedin.darkcluster.impl.ConstantQpsDarkClusterStrategy;
3941
import com.linkedin.darkcluster.impl.RelativeTrafficMultiplierDarkClusterStrategy;
4042
import com.linkedin.darkcluster.impl.DarkClusterStrategyFactoryImpl;
4143
import com.linkedin.darkcluster.impl.DefaultDarkClusterDispatcher;
@@ -314,6 +316,79 @@ public void testStrategyZeroMultiplier()
314316
Assert.assertTrue(strategy instanceof NoOpDarkClusterStrategy);
315317
}
316318

319+
@Test
320+
public void testRefreshShutsDownExistingStrategiesPerPartition()
321+
{
322+
// Build real per-partition strategies (like testUpdateStrategyDarkClusterChange), then refresh and assert the old
323+
// strategies were shut down.
324+
//
325+
// Note: RelativeTrafficMultiplierDarkClusterStrategy has no observable shutdown behavior. ConstantQpsDarkClusterStrategy
326+
// does (it calls ConstantQpsRateLimiter.cancelAll), so we use CONSTANT_QPS here with a tracking rate limiter.
327+
MockClusterInfoProvider clusterInfoProvider = new MockClusterInfoProvider();
328+
Facilities facilities = new MockFacilities(clusterInfoProvider);
329+
DarkClusterDispatcher darkClusterDispatcher = new DefaultDarkClusterDispatcher(new MockClient(false));
330+
ClockedExecutor executor = new ClockedExecutor();
331+
332+
AtomicInteger cancelAllInvocations = new AtomicInteger(0);
333+
Supplier<ConstantQpsRateLimiter> rateLimiterSupplier = () ->
334+
new TrackingConstantQpsRateLimiter(executor, executor, executor,
335+
TestConstantQpsDarkClusterStrategy.getBuffer(executor), cancelAllInvocations);
336+
337+
DarkClusterStrategyFactory strategyFactory = new DarkClusterStrategyFactoryImpl(facilities,
338+
SOURCE_CLUSTER_NAME,
339+
darkClusterDispatcher,
340+
new DoNothingNotifier(),
341+
new Random(SEED),
342+
new CountingVerifierManager(),
343+
rateLimiterSupplier);
344+
strategyFactory.start();
345+
346+
DarkClusterStrategyNameArray strategyList = new DarkClusterStrategyNameArray();
347+
strategyList.addAll(Collections.singletonList(CONSTANT_QPS));
348+
DarkClusterConfig constantQpsConfig = new DarkClusterConfig()
349+
.setDarkClusterStrategyPrioritizedList(strategyList)
350+
.setDispatcherOutboundTargetRate(1.0f)
351+
.setDispatcherMaxRequestsToBuffer(10)
352+
.setDispatcherBufferedRequestExpiryInSeconds(10);
353+
clusterInfoProvider.addDarkClusterConfig(SOURCE_CLUSTER_NAME, DARK_CLUSTER_NAME, constantQpsConfig);
354+
355+
// Trigger a refresh to build partition 0, then lazily create partition 1 so refresh will rebuild both.
356+
clusterInfoProvider.notifyListenersClusterAdded(SOURCE_CLUSTER_NAME);
357+
DarkClusterStrategy oldP0 = strategyFactory.get(DARK_CLUSTER_NAME, 0);
358+
DarkClusterStrategy oldP1 = strategyFactory.get(DARK_CLUSTER_NAME, 1);
359+
Assert.assertTrue(oldP0 instanceof ConstantQpsDarkClusterStrategy);
360+
Assert.assertTrue(oldP1 instanceof ConstantQpsDarkClusterStrategy);
361+
362+
// Refresh again: should rebuild both partitions and shut down the old strategies.
363+
clusterInfoProvider.notifyListenersClusterAdded(SOURCE_CLUSTER_NAME);
364+
Assert.assertEquals(cancelAllInvocations.get(), 2,
365+
"Expected existing strategies to be shut down once per partition on refresh");
366+
367+
Assert.assertTrue(strategyFactory.get(DARK_CLUSTER_NAME, 0) instanceof ConstantQpsDarkClusterStrategy);
368+
Assert.assertTrue(strategyFactory.get(DARK_CLUSTER_NAME, 1) instanceof ConstantQpsDarkClusterStrategy);
369+
}
370+
371+
private static final class TrackingConstantQpsRateLimiter extends ConstantQpsRateLimiter
372+
{
373+
private final AtomicInteger _cancelAllInvocations;
374+
375+
TrackingConstantQpsRateLimiter(ScheduledExecutorService scheduler,
376+
java.util.concurrent.Executor executor,
377+
com.linkedin.util.clock.Clock clock,
378+
com.linkedin.r2.transport.http.client.EvictingCircularBuffer callbackBuffer,
379+
AtomicInteger cancelAllInvocations)
380+
{
381+
super(scheduler, executor, clock, callbackBuffer);
382+
_cancelAllInvocations = cancelAllInvocations;
383+
}
384+
385+
@Override
386+
public void cancelAll(Throwable throwable)
387+
{
388+
_cancelAllInvocations.incrementAndGet();
389+
}
390+
}
391+
317392
private static class DeletingClusterListener implements LoadBalancerClusterListener
318393
{
319394
// handle to MockClusterInfoProvider so it can call triggerCluster actions.

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=29.81.1
1+
version=29.81.2
22
group=com.linkedin.pegasus
33
org.gradle.configureondemand=true
44
org.gradle.parallel=true

0 commit comments

Comments
 (0)