Skip to content

Commit 91ee301

Browse files
authored
Fix duplicate scheduling in procedure execution (#17902) (#17969)
* Fix duplicate scheduling in procedure execution * Fix delayed procedure deduplication and semaphore release * Fix SQL parser error handler traversal * Fix pipe procedure lock release race * Fix procedure lock wait scheduling (cherry picked from commit c25849a) (cherry picked from commit 0a45a3b)
1 parent 4e06a94 commit 91ee301

27 files changed

Lines changed: 433 additions & 89 deletions

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,9 @@ public class PipeTaskCoordinatorLock {
4343
public void lock() {
4444
LOGGER.debug(
4545
"PipeTaskCoordinator lock waiting for thread {}", Thread.currentThread().getName());
46-
try {
47-
semaphore.acquire();
48-
LOGGER.debug(
49-
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
50-
} catch (final InterruptedException e) {
51-
Thread.currentThread().interrupt();
52-
LOGGER.error(
53-
"Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}",
54-
Thread.currentThread().getName());
55-
}
46+
semaphore.acquireUninterruptibly();
47+
LOGGER.debug(
48+
"PipeTaskCoordinator lock acquired by thread {}", Thread.currentThread().getName());
5649
}
5750

5851
public boolean tryLock() {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.ArrayList;
3636
import java.util.Arrays;
3737
import java.util.List;
38+
import java.util.concurrent.atomic.AtomicBoolean;
3839
import java.util.concurrent.atomic.AtomicReference;
3940

4041
/**
@@ -60,6 +61,7 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> {
6061
private volatile long lastUpdate;
6162

6263
private final AtomicReference<byte[]> result = new AtomicReference<>();
64+
private final AtomicBoolean executing = new AtomicBoolean(false);
6365
private volatile boolean locked = false;
6466
private boolean lockedWhenLoading = false;
6567

@@ -233,6 +235,16 @@ protected void releaseLock(Env env) {
233235
// no op
234236
}
235237

238+
/**
239+
* Called after an execution attempt returns {@link ProcedureLockState#LOCK_EVENT_WAIT}. Override
240+
* it to put the procedure into the corresponding lock wait queue.
241+
*
242+
* @param env env
243+
*/
244+
protected void waitForLock(Env env) {
245+
// no op
246+
}
247+
236248
/**
237249
* Used to keep procedure lock even when the procedure is yielded or suspended.
238250
*
@@ -254,6 +266,14 @@ protected boolean isYieldAfterExecution(Env env) {
254266
}
255267

256268
// -------------------------Internal methods - called by the procedureExecutor------------------
269+
final boolean tryAcquireExecution() {
270+
return executing.compareAndSet(false, true);
271+
}
272+
273+
final void releaseExecution() {
274+
executing.set(false);
275+
}
276+
257277
/**
258278
* Internal method called by the ProcedureExecutor that starts the user-level code execute().
259279
*

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java

Lines changed: 76 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.commons.concurrent.ThreadName;
2323
import org.apache.iotdb.commons.utils.TestOnly;
24-
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
2524
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
2625
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
2726
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
@@ -40,7 +39,6 @@
4039
import java.util.Deque;
4140
import java.util.HashSet;
4241
import java.util.List;
43-
import java.util.Objects;
4442
import java.util.Set;
4543
import java.util.concurrent.ConcurrentHashMap;
4644
import java.util.concurrent.CopyOnWriteArrayList;
@@ -82,6 +80,16 @@ public class ProcedureExecutor<Env> {
8280
private final Env environment;
8381
private final IProcedureStore<Env> store;
8482

83+
private static final class LockStateResult<Env> {
84+
private final ProcedureLockState lockState;
85+
private final Procedure<Env> procedure;
86+
87+
private LockStateResult(ProcedureLockState lockState, Procedure<Env> procedure) {
88+
this.lockState = lockState;
89+
this.procedure = procedure;
90+
}
91+
}
92+
8593
public ProcedureExecutor(
8694
final Env environment, final IProcedureStore<Env> store, final ProcedureScheduler scheduler) {
8795
this.environment = environment;
@@ -320,32 +328,38 @@ private void executeProcedure(Procedure<Env> proc) {
320328
return;
321329
}
322330
ProcedureLockState lockState = null;
331+
Procedure<Env> lockEventWaitProcedure = null;
323332
try {
324333
do {
325334
if (!rootProcStack.acquire()) {
326335
if (rootProcStack.setRollback()) {
327-
lockState = executeRootStackRollback(rootProcId, rootProcStack);
336+
LockStateResult<Env> lockStateResult =
337+
executeRootStackRollback(rootProcId, rootProcStack);
338+
lockState = lockStateResult.lockState;
328339
switch (lockState) {
329340
case LOCK_ACQUIRED:
330341
break;
331342
case LOCK_EVENT_WAIT:
332-
LOG.info("LOCK_EVENT_WAIT rollback {}", proc);
343+
LOG.info("LOCK_EVENT_WAIT rollback {}", lockStateResult.procedure);
333344
rootProcStack.unsetRollback();
345+
lockEventWaitProcedure = lockStateResult.procedure;
334346
break;
335347
case LOCK_YIELD_WAIT:
336348
rootProcStack.unsetRollback();
337-
scheduler.yield(proc);
349+
scheduler.yield(lockStateResult.procedure);
338350
break;
339351
default:
340352
throw new UnsupportedOperationException();
341353
}
342354
} else {
343355
if (!proc.wasExecuted()) {
344-
switch (executeRollback(proc)) {
356+
lockState = executeRollback(proc);
357+
switch (lockState) {
345358
case LOCK_ACQUIRED:
346359
break;
347360
case LOCK_EVENT_WAIT:
348361
LOG.info("LOCK_EVENT_WAIT can't rollback child running for {}", proc);
362+
lockEventWaitProcedure = proc;
349363
break;
350364
case LOCK_YIELD_WAIT:
351365
scheduler.yield(proc);
@@ -357,19 +371,25 @@ private void executeProcedure(Procedure<Env> proc) {
357371
}
358372
break;
359373
}
360-
lockState = acquireLock(proc);
361-
switch (lockState) {
362-
case LOCK_ACQUIRED:
363-
executeProcedure(rootProcStack, proc);
364-
break;
365-
case LOCK_YIELD_WAIT:
366-
case LOCK_EVENT_WAIT:
367-
LOG.info("{} lockstate is {}", proc, lockState);
368-
break;
369-
default:
370-
throw new UnsupportedOperationException();
374+
try {
375+
lockState = acquireLock(proc);
376+
switch (lockState) {
377+
case LOCK_ACQUIRED:
378+
executeProcedure(rootProcStack, proc);
379+
break;
380+
case LOCK_YIELD_WAIT:
381+
case LOCK_EVENT_WAIT:
382+
LOG.info("{} lockstate is {}", proc, lockState);
383+
if (lockState == ProcedureLockState.LOCK_EVENT_WAIT) {
384+
lockEventWaitProcedure = proc;
385+
}
386+
break;
387+
default:
388+
throw new UnsupportedOperationException();
389+
}
390+
} finally {
391+
rootProcStack.release();
371392
}
372-
rootProcStack.release();
373393

374394
if (proc.isSuccess()) {
375395
// update metrics on finishing the procedure
@@ -387,9 +407,9 @@ private void executeProcedure(Procedure<Env> proc) {
387407
} finally {
388408
// Only after procedure has completed execution can it be allowed to be rescheduled to prevent
389409
// data races
390-
if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
391-
LOG.info("procedureId {} wait for lock.", proc.getProcId());
392-
((ConfigNodeProcedureEnv) this.environment).getNodeLock().waitProcedure(proc);
410+
if (lockEventWaitProcedure != null) {
411+
LOG.info("procedureId {} wait for lock.", lockEventWaitProcedure.getProcId());
412+
lockEventWaitProcedure.waitForLock(this.environment);
393413
}
394414
}
395415
}
@@ -404,6 +424,7 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
404424
if (proc.getState() != ProcedureState.RUNNABLE) {
405425
LOG.error(
406426
"The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc);
427+
releaseLock(proc, false);
407428
return;
408429
}
409430
boolean reExecute;
@@ -570,8 +591,8 @@ private void yieldProcedure(Procedure<Env> proc) {
570591
* @param procedureStack root procedure stack
571592
* @return lock state
572593
*/
573-
private ProcedureLockState executeRootStackRollback(
574-
Long rootProcId, RootProcedureStack procedureStack) {
594+
private LockStateResult<Env> executeRootStackRollback(
595+
Long rootProcId, RootProcedureStack<Env> procedureStack) {
575596
Procedure<Env> rootProcedure = procedures.get(rootProcId);
576597
ProcedureException exception = rootProcedure.getException();
577598
if (exception == null) {
@@ -590,19 +611,19 @@ private ProcedureLockState executeRootStackRollback(
590611
}
591612
ProcedureLockState lockState = acquireLock(procedure);
592613
if (lockState != ProcedureLockState.LOCK_ACQUIRED) {
593-
return lockState;
614+
return new LockStateResult<>(lockState, procedure);
594615
}
595616
lockState = executeRollback(procedure);
596617
releaseLock(procedure, false);
597618

598619
boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED;
599620
abortRollback |= !isRunning() || !store.isRunning();
600621
if (abortRollback) {
601-
return lockState;
622+
return new LockStateResult<>(lockState, procedure);
602623
}
603624

604625
if (!procedure.isFinished() && procedure.isYieldAfterExecution(this.environment)) {
605-
return ProcedureLockState.LOCK_YIELD_WAIT;
626+
return new LockStateResult<>(ProcedureLockState.LOCK_YIELD_WAIT, procedure);
606627
}
607628

608629
if (procedure != rootProcedure) {
@@ -612,7 +633,7 @@ private ProcedureLockState executeRootStackRollback(
612633

613634
LOG.info("Rolled back {}, time duration is {}", rootProcedure, rootProcedure.elapsedTime());
614635
rootProcedureCleanup(rootProcedure);
615-
return ProcedureLockState.LOCK_ACQUIRED;
636+
return new LockStateResult<>(ProcedureLockState.LOCK_ACQUIRED, rootProcedure);
616637
}
617638

618639
private ProcedureLockState acquireLock(Procedure<Env> proc) {
@@ -728,16 +749,33 @@ public void run() {
728749
Thread.sleep(1000);
729750
continue;
730751
}
731-
this.activeProcedure.set(procedure);
732-
activeExecutorCount.incrementAndGet();
733-
startTime.set(System.currentTimeMillis());
734-
executeProcedure(procedure);
735-
activeExecutorCount.decrementAndGet();
736-
LOG.trace(
737-
"Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
738-
this.activeProcedure.set(null);
739-
lastUpdated = System.currentTimeMillis();
740-
startTime.set(lastUpdated);
752+
boolean executionAcquired = false;
753+
while (isRunning() && !(executionAcquired = procedure.tryAcquireExecution())) {
754+
Thread.sleep(10);
755+
}
756+
if (!executionAcquired) {
757+
continue;
758+
}
759+
try {
760+
this.activeProcedure.set(procedure);
761+
activeExecutorCount.incrementAndGet();
762+
startTime.set(System.currentTimeMillis());
763+
try {
764+
executeProcedure(procedure);
765+
} finally {
766+
procedure.releaseExecution();
767+
activeExecutorCount.decrementAndGet();
768+
LOG.trace(
769+
"Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
770+
this.activeProcedure.set(null);
771+
lastUpdated = System.currentTimeMillis();
772+
startTime.set(lastUpdated);
773+
}
774+
} catch (Exception e) {
775+
LOG.warn(
776+
"Exception happened when worker {} execute procedure {}", getName(), procedure, e);
777+
throw e;
778+
}
741779
}
742780

743781
} catch (Exception e) {
@@ -748,6 +786,7 @@ public void run() {
748786
this.activeProcedure.get(),
749787
e);
750788
}
789+
this.activeProcedure.set(null);
751790
} finally {
752791
LOG.info("Procedure worker {} terminated.", getName());
753792
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,13 @@ public TimeoutExecutorThread(
3737
}
3838

3939
public void add(Procedure<Env> procedure) {
40-
queue.add(new ProcedureDelayContainer<>(procedure));
40+
ProcedureDelayContainer<Env> delayTask = new ProcedureDelayContainer<>(procedure);
41+
queue.remove(delayTask);
42+
queue.add(delayTask);
4143
}
4244

4345
public boolean remove(Procedure<Env> procedure) {
44-
return queue.remove(new ProcedureDelayContainer<>(procedure));
46+
return queue.remove(new ProcedureDelayContainer<>(procedure)) || procedure.isFinished();
4547
}
4648

4749
private ProcedureDelayContainer<Env> takeQuietly() {
@@ -62,10 +64,15 @@ public void run() {
6264
}
6365
Procedure<Env> procedure = delayTask.getProcedure();
6466
if (procedure instanceof InternalProcedure) {
67+
if (procedure.isFinished()) {
68+
continue;
69+
}
6570
InternalProcedure internal = (InternalProcedure) procedure;
6671
internal.periodicExecute(executor.getEnvironment());
67-
procedure.updateTimestamp();
68-
queue.add(delayTask);
72+
if (!procedure.isFinished()) {
73+
procedure.updateTimestamp();
74+
queue.add(delayTask);
75+
}
6976
} else {
7077
if (procedure.setTimeoutFailure(executor.getEnvironment())) {
7178
long rootProcId = executor.getRootProcedureId(procedure);
@@ -92,6 +99,23 @@ public Procedure<Env> getProcedure() {
9299
return procedure;
93100
}
94101

102+
@Override
103+
public boolean equals(Object o) {
104+
if (this == o) {
105+
return true;
106+
}
107+
if (!(o instanceof ProcedureDelayContainer)) {
108+
return false;
109+
}
110+
ProcedureDelayContainer<?> that = (ProcedureDelayContainer<?>) o;
111+
return procedure == that.procedure;
112+
}
113+
114+
@Override
115+
public int hashCode() {
116+
return System.identityHashCode(procedure);
117+
}
118+
95119
@Override
96120
public long getDelay(TimeUnit unit) {
97121
long delay = procedure.getTimeoutTimestamp() - System.currentTimeMillis();

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ private void addNextStateAndCalculateCycles() {
195195
nextState);
196196
}
197197
}
198-
if (getStateId(getCurrentState()) == stateToBeAdded) {
198+
final TState currentState = getCurrentState();
199+
if (currentState != null && getStateId(currentState) == stateToBeAdded) {
199200
cycles++;
200201
} else {
201202
cycles = 0;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ public boolean equals(Object o) {
295295
}
296296
CreateCQProcedure that = (CreateCQProcedure) o;
297297
return getProcId() == that.getProcId()
298-
&& getCurrentState().equals(that.getCurrentState())
298+
&& Objects.equals(getCurrentState(), that.getCurrentState())
299299
&& getCycles() == that.getCycles()
300300
&& isGeneratedByPipe == that.isGeneratedByPipe
301301
&& firstExecutionTime == that.firstExecutionTime

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,18 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced
5656
}
5757
}
5858

59+
@Override
60+
protected void waitForLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
61+
configNodeProcedureEnv.getSchedulerLock().lock();
62+
try {
63+
configNodeProcedureEnv
64+
.getNodeLock()
65+
.waitProcedure(this, configNodeProcedureEnv.getScheduler());
66+
} finally {
67+
configNodeProcedureEnv.getSchedulerLock().unlock();
68+
}
69+
}
70+
5971
@Override
6072
protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
6173
configNodeProcedureEnv.getSchedulerLock().lock();

0 commit comments

Comments
 (0)