-
Notifications
You must be signed in to change notification settings - Fork 6
Drain LibMR threads to a safe point before fork() (MOD-15307) #97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
2763e7d
0b2a3b1
a774007
03aa31e
f6fe687
995f842
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,8 @@ | |
| #include "utils/buffer.h" | ||
|
|
||
| #include <pthread.h> | ||
| #include <time.h> | ||
| #include <errno.h> | ||
| #include <hiredis.h> | ||
| #ifndef EXECUTION_DEFAULT_MAX_IDLE_MS | ||
| #define EXECUTION_DEFAULT_MAX_IDLE_MS 5000 | ||
|
|
@@ -286,6 +288,16 @@ struct Execution { | |
| ExecutionCallbacks callbacks; | ||
| MR_LoopTaskCtx* timeoutTask; | ||
| size_t timeoutMS; | ||
|
|
||
| /* MOD-14615 diagnostics: so a max-idle timeout can name the shard(s) that | ||
| * did not reply. respondedAck/respondedDone hold the sender node-ids that | ||
| * sent an ACK / NOTIFY_DONE; pending = cluster peers minus that set. The | ||
| * timestamps give the elapsed wait. Touched only on the event-loop thread | ||
| * (ack/done/timeout handlers), so no extra locking is needed. */ | ||
| mr_dict* respondedAck; | ||
| mr_dict* respondedDone; | ||
| long long dispatchMonoMs; | ||
| long long lastProgressMonoMs; | ||
| }; | ||
|
|
||
| struct ExecutionCtx { | ||
|
|
@@ -531,9 +543,24 @@ static Execution* MR_ExecutionAlloc() { | |
| e->timeoutTask = NULL; | ||
| e->timeoutMS = EXECUTION_DEFAULT_MAX_IDLE_MS; | ||
| e->flags = 0; | ||
| /* MOD-14615 diagnostics. Node-ids are NUL-terminated strings, so use the | ||
| * heap-strings dict type (matches the cluster node table), NOT the fixed | ||
| * ID_LEN dict type used for execution ids. */ | ||
| e->respondedAck = mr_dictCreate(&mr_dictTypeHeapStrings, NULL); | ||
| e->respondedDone = mr_dictCreate(&mr_dictTypeHeapStrings, NULL); | ||
| e->dispatchMonoMs = 0; | ||
| e->lastProgressMonoMs = 0; | ||
| return e; | ||
| } | ||
|
|
||
| /* MOD-14615: monotonic milliseconds for measuring how long an execution waited | ||
| * for peer replies. CLOCK_MONOTONIC so it is unaffected by wall-clock changes. */ | ||
| static long long mr_nowMonoMs(void) { | ||
| struct timespec t; | ||
| clock_gettime(CLOCK_MONOTONIC, &t); | ||
| return (long long)t.tv_sec * 1000LL + t.tv_nsec / 1000000LL; | ||
| } | ||
|
|
||
| Execution* MR_CreateExecution(ExecutionBuilder* builder, MRError** err) { | ||
| if (!MR_IsClusterInitialize()) { | ||
| *err = &UINITIALIZED_CLUSTER_ERROR; | ||
|
|
@@ -1016,6 +1043,8 @@ static void MR_NotifyDone(RedisModuleCtx *ctx, const char *sender_id, uint8_t ty | |
| return; | ||
| } | ||
|
|
||
| if (sender_id) mr_dictAdd(e->respondedDone, (void*)sender_id, NULL); /* MOD-14615 */ | ||
| e->lastProgressMonoMs = mr_nowMonoMs(); | ||
| ++e->nCompleted; | ||
| if (e->nCompleted == MR_ClusterGetSize() - 1) { | ||
| /* Execution is finished on all the shards, | ||
|
|
@@ -1077,6 +1106,8 @@ static void MR_AckExecution(RedisModuleCtx *ctx, const char *sender_id, uint8_t | |
| return; | ||
| } | ||
|
|
||
| if (sender_id) mr_dictAdd(e->respondedAck, (void*)sender_id, NULL); /* MOD-14615 */ | ||
| e->lastProgressMonoMs = mr_nowMonoMs(); | ||
| ++e->nReceived; | ||
| if (e->nReceived == MR_ClusterGetSize() - 1) { | ||
| /* all shards have received the execution, we can invoke it. */ | ||
|
|
@@ -1293,6 +1324,12 @@ static void MR_ExecutionDistribute(Execution* e, void* pd) { | |
| MR_ClusterSendMsg(NULL, fid, buff.buff, buff.size); | ||
|
|
||
| /* now we wait for shards to respond that they got the execution */ | ||
| /* MOD-14615: stamp dispatch so a max-idle timeout can report the elapsed | ||
| * wait. Debug-level (per-execution, would be noisy at notice). */ | ||
| e->dispatchMonoMs = e->lastProgressMonoMs = mr_nowMonoMs(); | ||
| RedisModule_Log(mr_staticCtx, "debug", | ||
| "MR execution %s dispatched to %zu peers (timeoutMS=%zu)", | ||
| e->idStr, MR_ClusterGetSize() - 1, e->timeoutMS); | ||
| } | ||
|
|
||
| static void MR_ExecutionTimedOutInternal(Execution* e, void* pd) { | ||
|
|
@@ -1310,6 +1347,29 @@ static void MR_ExecutionTimedOut(void* ctx) { | |
| /* execution timed out */ | ||
| e->timeoutTask = NULL; | ||
| ++mrCtx.stats.nMaxIdleReached; | ||
|
|
||
| /* MOD-14615: name the shard(s) that did not reply before we discard the | ||
| * execution. We are stalled in the ACK phase if not all shards acknowledged | ||
| * receipt, otherwise in the DONE phase; pick the matching responded set so | ||
| * `pending` lists exactly the non-responders. Warning level: this coincides | ||
| * with the user-visible multi-shard failure and is rate-bounded by | ||
| * nMaxIdleReached, and warning IS captured at the default loglevel. */ | ||
| { | ||
| size_t expected = MR_ClusterGetSize() - 1; | ||
| long long now = mr_nowMonoMs(); | ||
| int ackPhase = (e->nReceived < expected); | ||
| mr_dict* got = ackPhase ? e->respondedAck : e->respondedDone; | ||
| size_t nGot = ackPhase ? e->nReceived : e->nCompleted; | ||
| char pending[1024]; | ||
| MR_ClusterFormatPendingPeers(got, pending, sizeof(pending)); | ||
| RedisModule_Log(mr_staticCtx, "warning", | ||
| "MR execution %s max-idle reached: phase=%s replies=%zu/%zu waited=%lldms " | ||
| "sinceLastProgress=%lldms pending=[%s]", | ||
| e->idStr, ackPhase ? "ack" : "done", nGot, expected, | ||
| e->dispatchMonoMs ? now - e->dispatchMonoMs : -1, | ||
| e->lastProgressMonoMs ? now - e->lastProgressMonoMs : -1, pending); | ||
| } | ||
|
|
||
| /* Delete the execution from the executions dictionary, | ||
| * We will ignore further messages on this execution. */ | ||
| mr_dictDelete(mrCtx.executionsDict, e->id); | ||
|
|
@@ -1408,6 +1468,121 @@ void MR_ExecutionSetMaxIdle(Execution* e, size_t maxIdle) { | |
| e->timeoutMS = maxIdle; | ||
| } | ||
|
|
||
| /* ---- MOD-15307: drain LibMR background threads to a safe point before fork() ---- | ||
| * | ||
| * A LibMR thread (a worker in the execution pool, or the event-loop thread) that holds | ||
| * a libc lock (e.g. the malloc arena lock) at the instant redis calls fork() leaves the | ||
| * child holding a locked mutex with no owner -> the child ghost-locks the first time it | ||
| * mallocs (RDB save, slot-migration snapshot, etc.). That is the root trigger behind the | ||
| * ASM-migration nightly hangs (MOD-15307) and, downstream, the multi-shard query max-idle | ||
| * timeout (MOD-14615). | ||
| * | ||
| * MR_DrainForFork() (called on the main thread from the FORK_CHILD_PRE module event) brings | ||
| * both kinds of LibMR thread to a safe, lock-free point: | ||
| * - the event-loop thread is parked *between tasks* via a posted task (so it is not | ||
| * mid-message-deserialize / mid-malloc, and is not holding the module GIL); | ||
| * - with the event-loop thread parked no new executions are dispatched, so the worker | ||
| * pool drains to idle. | ||
| * Both waits are bounded; on timeout we fork anyway (fail-open -> never worse than today). | ||
| * Note a worker still blocked acquiring the module GIL (held by us) is itself malloc-safe, | ||
| * so a timeout there is benign. MR_ResumeAfterFork() releases the parked event-loop thread | ||
| * and must be called after fork() (FORK_CHILD_BORN) or if the fork was cancelled. */ | ||
| #define MR_FORK_DRAIN_TIMEOUT_MS 2000 | ||
|
|
||
| static pthread_mutex_t mr_forkDrainLock = PTHREAD_MUTEX_INITIALIZER; | ||
| static pthread_cond_t mr_forkDrainCond = PTHREAD_COND_INITIALIZER; | ||
| static int mr_forkElParked = 0; /* set by the park task once the el-thread is quiesced */ | ||
| static int mr_forkElRelease = 0; /* set by MR_ResumeAfterFork to release the el-thread */ | ||
|
|
||
| /* Runs on the event-loop thread, between tasks (a safe point). Parks the thread until | ||
| * MR_ResumeAfterFork() is called. */ | ||
| static void MR_ForkParkElThread(void* ctx) { | ||
| REDISMODULE_NOT_USED(ctx); | ||
| pthread_mutex_lock(&mr_forkDrainLock); | ||
| mr_forkElParked = 1; | ||
| pthread_cond_broadcast(&mr_forkDrainCond); | ||
| while (!mr_forkElRelease) { | ||
| pthread_cond_wait(&mr_forkDrainCond, &mr_forkDrainLock); | ||
| } | ||
| mr_forkElParked = 0; | ||
| pthread_mutex_unlock(&mr_forkDrainLock); | ||
| } | ||
|
|
||
| static int mr_forkDeadlinePassed(const struct timespec* deadline) { | ||
| struct timespec now; | ||
| clock_gettime(CLOCK_REALTIME, &now); | ||
| return (now.tv_sec > deadline->tv_sec) || | ||
| (now.tv_sec == deadline->tv_sec && now.tv_nsec >= deadline->tv_nsec); | ||
| } | ||
|
|
||
| void MR_DrainForFork(void) { | ||
| /* Nothing to drain if the pool / event loop were never started. */ | ||
| if (!mrCtx.executionsThreadPool) return; | ||
|
|
||
| /* Precise drain-only timing: measures just the quiesce (park + worker wait), | ||
| * excluding the fork() that follows. */ | ||
| struct timespec _drain_t0; | ||
| clock_gettime(CLOCK_MONOTONIC, &_drain_t0); | ||
|
|
||
| struct timespec deadline; | ||
| clock_gettime(CLOCK_REALTIME, &deadline); | ||
| deadline.tv_sec += MR_FORK_DRAIN_TIMEOUT_MS / 1000; | ||
| deadline.tv_nsec += (MR_FORK_DRAIN_TIMEOUT_MS % 1000) * 1000000L; | ||
| if (deadline.tv_nsec >= 1000000000L) { deadline.tv_sec++; deadline.tv_nsec -= 1000000000L; } | ||
|
|
||
| /* 1) Park the event-loop thread at a between-tasks safe point. */ | ||
| pthread_mutex_lock(&mr_forkDrainLock); | ||
| mr_forkElParked = 0; | ||
| mr_forkElRelease = 0; | ||
| pthread_mutex_unlock(&mr_forkDrainLock); | ||
|
|
||
| MR_EventLoopAddTask(MR_ForkParkElThread, NULL); | ||
|
|
||
| pthread_mutex_lock(&mr_forkDrainLock); | ||
| while (!mr_forkElParked) { | ||
| if (pthread_cond_timedwait(&mr_forkDrainCond, &mr_forkDrainLock, &deadline) == ETIMEDOUT) | ||
| break; | ||
| } | ||
| int parked = mr_forkElParked; | ||
| pthread_mutex_unlock(&mr_forkDrainLock); | ||
|
|
||
| if (!parked) { | ||
| RedisModule_Log(mr_staticCtx, "warning", | ||
| "MR_DrainForFork: event-loop thread did not park within %dms; forking anyway", | ||
| MR_FORK_DRAIN_TIMEOUT_MS); | ||
| } | ||
|
|
||
| /* 2) Wait (bounded) for in-flight worker jobs to finish. With the event loop parked no | ||
| * new work is dispatched, so the working count drains to 0 unless a worker is blocked | ||
| * acquiring the GIL we hold -- a malloc-safe state, so a timeout there is benign. */ | ||
| while (mr_thpool_num_threads_working(mrCtx.executionsThreadPool) > 0) { | ||
| if (mr_forkDeadlinePassed(&deadline)) { | ||
| RedisModule_Log(mr_staticCtx, "warning", | ||
| "MR_DrainForFork: %d worker(s) still busy at timeout; forking anyway", | ||
| mr_thpool_num_threads_working(mrCtx.executionsThreadPool)); | ||
| break; | ||
| } | ||
| struct timespec nap = { .tv_sec = 0, .tv_nsec = 1000000L }; /* 1ms */ | ||
| nanosleep(&nap, NULL); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shared fork drain timeoutMedium Severity
Reviewed by Cursor Bugbot for commit f6fe687. Configure here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one is intentional. In the normal case the park task runs in microseconds, so phase two effectively gets the full budget (measured median drain ~62us). If the park itself cannot complete within the budget, the event-loop thread is stuck off a safe point and the correct bounded fallback is to fork anyway — the drain is best-effort, not a hard barrier. Giving phase two its own fresh 2s budget would double the worst-case main-thread stall, which we deliberately avoid. |
||
|
|
||
| struct timespec _drain_t1; | ||
| clock_gettime(CLOCK_MONOTONIC, &_drain_t1); | ||
| long long _drain_us = (long long)(_drain_t1.tv_sec - _drain_t0.tv_sec) * 1000000LL + | ||
| (_drain_t1.tv_nsec - _drain_t0.tv_nsec) / 1000LL; | ||
| RedisModule_Log(mr_staticCtx, "debug", | ||
| "MR_DrainForFork drained in %lld us (el_parked=%d, busy_workers=%d)", | ||
| _drain_us, parked, mr_thpool_num_threads_working(mrCtx.executionsThreadPool)); | ||
| } | ||
|
|
||
| void MR_ResumeAfterFork(void) { | ||
| if (!mrCtx.executionsThreadPool) return; | ||
| pthread_mutex_lock(&mr_forkDrainLock); | ||
| mr_forkElRelease = 1; | ||
| pthread_cond_broadcast(&mr_forkDrainCond); | ||
| pthread_mutex_unlock(&mr_forkDrainLock); | ||
| } | ||
|
|
||
| void MR_Run(Execution* e) { | ||
| /* take ownership on the execution */ | ||
| __atomic_add_fetch(&e->refCount, 1, __ATOMIC_RELAXED); | ||
|
|
@@ -1523,6 +1698,8 @@ void MR_FreeExecution(Execution* e) { | |
| MR_RecordFree(e->errors[i]); | ||
| } | ||
| array_free(e->errors); | ||
| if (e->respondedAck) mr_dictRelease(e->respondedAck); /* MOD-14615 */ | ||
| if (e->respondedDone) mr_dictRelease(e->respondedDone); /* MOD-14615 */ | ||
| MR_FREE(e); | ||
| } | ||
|
|
||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.