Skip to content

Commit b3fc8ae

Browse files
fix premature max-attempts from register() racing in-flight list()
In tick(), push each due job's nextCheckAt forward immediately after incrementing attempts. Without this, a register() arriving while the first tick's list() is still in flight would call rescheduleIfNeeded with stale nextCheckAt values in the past, re-firing a tick that re-marked the same jobs as due and double-counted their attempts. Concurrent registrations against a slow list() could burn through maxAttempts in milliseconds. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 39362fa commit b3fc8ae

2 files changed

Lines changed: 85 additions & 0 deletions

File tree

core/src/main/java/org/mineskin/JobBatchChecker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ private void tick() {
9797
pending.remove(p.jobInfo.id());
9898
continue;
9999
}
100+
// Push nextCheckAt forward immediately so that a concurrent reschedule
101+
// (e.g. from register()) arriving before the async response doesn't see
102+
// this job as still-due and double-count its attempts. The response
103+
// handler will overwrite with a correct `responseTime + interval`.
104+
p.nextCheckAt = now + options.interval().getInterval(attempt);
100105
due.add(p);
101106
}
102107

tests/src/test/java/test/JobBatchCheckerTest.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,68 @@ public void usesIndividualGetsWhenFewPending() throws Exception {
138138
}
139139
}
140140

141+
@Test
142+
public void doesNotDoubleCountAttemptsDuringInflightList() throws Exception {
143+
FakeQueueClient queue = new FakeQueueClient();
144+
MineSkinClient client = new FakeMineSkinClient(queue);
145+
for (int i = 0; i < 8; i++) {
146+
queue.setStatus("job" + i, JobStatus.WAITING, null);
147+
}
148+
149+
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
150+
try {
151+
// maxAttempts=3: if register() during an in-flight list() retriggers a tick
152+
// that re-marks the original jobs as due, their attempts climb by one per
153+
// racing register. Three rapid registers past maxAttempts=3 would trip
154+
// "Max attempts reached" without the tick-side push-forward.
155+
JobCheckOptions options = JobCheckOptions.create(scheduler)
156+
.withInitialDelay(30)
157+
.withInterval(RequestInterval.constant(40))
158+
.withMaxAttempts(3);
159+
JobBatchChecker checker = new JobBatchChecker(client, options);
160+
161+
// Hold every list() response so the first tick's request stays in-flight
162+
// the entire time new registrations land.
163+
queue.holdAllListCalls = true;
164+
165+
List<CompletableFuture<JobReference>> futures = new ArrayList<>();
166+
for (int i = 0; i < 5; i++) {
167+
futures.add(checker.register(new JobInfo("job" + i, JobStatus.WAITING, System.currentTimeMillis(), null)));
168+
}
169+
170+
// Wait until the first tick fires and stashes a list() call.
171+
long deadline = System.currentTimeMillis() + 500;
172+
while (queue.listCalls.get() < 1 && System.currentTimeMillis() < deadline) {
173+
Thread.sleep(5);
174+
}
175+
assertTrue(queue.listCalls.get() >= 1, "first tick should have called list()");
176+
177+
// Race window: each of these registrations triggers rescheduleIfNeeded while
178+
// the held list hasn't released; the sleep lets any scheduled tick actually fire.
179+
for (int i = 5; i < 8; i++) {
180+
futures.add(checker.register(new JobInfo("job" + i, JobStatus.WAITING, System.currentTimeMillis(), null)));
181+
Thread.sleep(20);
182+
}
183+
184+
// Mark everything COMPLETED and release all held list responses.
185+
for (int i = 0; i < 8; i++) {
186+
queue.setStatus("job" + i, JobStatus.COMPLETED, "uuid-" + i);
187+
}
188+
queue.releaseAllHeldLists();
189+
190+
// All futures should resolve normally. Without the tick-side push-forward,
191+
// the original 5 would have tripped max-attempts during the race.
192+
for (int i = 0; i < 8; i++) {
193+
JobReference ref = awaitFuture(futures.get(i), 2000);
194+
assertEquals("job" + i, ref.getJob().id());
195+
assertFalse(futures.get(i).isCompletedExceptionally(),
196+
"future " + i + " should not have hit max attempts");
197+
}
198+
} finally {
199+
scheduler.shutdownNow();
200+
}
201+
}
202+
141203
private static JobReference awaitFuture(CompletableFuture<JobReference> future, long timeoutMs)
142204
throws InterruptedException, TimeoutException, ExecutionException {
143205
try {
@@ -168,11 +230,24 @@ private static final class FakeQueueClient implements QueueClient {
168230
final AtomicInteger listCalls = new AtomicInteger();
169231
final Map<String, Integer> getCallsForId = new ConcurrentHashMap<>();
170232
private final Map<String, JobInfo> state = new ConcurrentHashMap<>();
233+
volatile boolean holdAllListCalls = false;
234+
private final List<CompletableFuture<JobListResponse>> heldListFutures =
235+
Collections.synchronizedList(new ArrayList<>());
171236

172237
void setStatus(String id, JobStatus status, String result) {
173238
state.put(id, new JobInfo(id, status, System.currentTimeMillis(), result));
174239
}
175240

241+
void releaseAllHeldLists() {
242+
List<JobInfo> snapshot = new ArrayList<>(state.values());
243+
synchronized (heldListFutures) {
244+
for (CompletableFuture<JobListResponse> fut : heldListFutures) {
245+
fut.complete(new FakeJobListResponse(snapshot));
246+
}
247+
heldListFutures.clear();
248+
}
249+
}
250+
176251
@Override
177252
public CompletableFuture<QueueResponse> submit(GenerateRequest request) {
178253
throw new UnsupportedOperationException();
@@ -194,6 +269,11 @@ public CompletableFuture<JobResponse> get(String id) {
194269
@Override
195270
public CompletableFuture<JobListResponse> list() {
196271
listCalls.incrementAndGet();
272+
if (holdAllListCalls) {
273+
CompletableFuture<JobListResponse> fut = new CompletableFuture<>();
274+
heldListFutures.add(fut);
275+
return fut;
276+
}
197277
List<JobInfo> snapshot = new ArrayList<>(state.values());
198278
return CompletableFuture.completedFuture(new FakeJobListResponse(snapshot));
199279
}

0 commit comments

Comments
 (0)