Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,31 @@ size_t MR_ClusterGetSize(){
return clusterCtx.clusterSize;
}

/* MOD-14615: format the peer shards that are NOT me and NOT present in the
* `responded` set into "id(ip:port),..." in `out`, so a max-idle timeout can
* name the non-responding shard(s). Runs on the event-loop thread (same as the
* message handlers), so reading the cluster node table needs no extra locking.
* The node-id is the same NUL-terminated string used as the responded-set key. */
void MR_ClusterFormatPendingPeers(mr_dict* responded, char* out, size_t outLen) {
if (outLen == 0) return;
out[0] = '\0';
if (!clusterCtx.CurrCluster || !clusterCtx.CurrCluster->nodes) return;
mr_dictIterator *iter = mr_dictGetIterator(clusterCtx.CurrCluster->nodes);
mr_dictEntry *entry = NULL;
size_t off = 0;
while ((entry = mr_dictNext(iter))) {
Node* n = mr_dictGetVal(entry);
if (n->isMe) continue;
if (responded && mr_dictFind(responded, n->id)) continue;
int w = snprintf(out + off, outLen - off, "%s%s(%s:%u)",
off ? "," : "", n->id, n->ip ? n->ip : "?", n->port);
if (w < 0) break;
if ((size_t)w >= outLen - off) { out[outLen - 1] = '\0'; break; } /* truncated */
off += (size_t)w;
}
mr_dictReleaseIterator(iter);
}

int MR_ClusterIsClusterMode(){
return MR_ClusterGetSize() > 1;
}
Expand Down
7 changes: 7 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ int MR_IsClusterInitialize();

size_t MR_ClusterGetSize();

/* MOD-14615: format peer shards (id(ip:port)) that are NOT in `responded` into
* `out`, used to name non-responding shards on a max-idle timeout. The tag is
* forward-declared so cluster.h stays independent of utils/dict.h include order
* (full definition lives in utils/dict.h). */
struct mr_dict;
void MR_ClusterFormatPendingPeers(struct mr_dict* responded, char* out, size_t outLen);

int MR_ClusterInit(RedisModuleCtx* rctx, char *password);

size_t MR_ClusterGetSlotByKey(const char* key, size_t len);
Expand Down
182 changes: 182 additions & 0 deletions src/mr.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "utils/buffer.h"

#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <hiredis.h>
#ifndef EXECUTION_DEFAULT_MAX_IDLE_MS
#define EXECUTION_DEFAULT_MAX_IDLE_MS 5000
Expand Down Expand Up @@ -286,6 +288,16 @@ struct Execution {
ExecutionCallbacks callbacks;
MR_LoopTaskCtx* timeoutTask;
size_t timeoutMS;

/* MOD-14615 diagnostics: so a max-idle timeout can name the shard(s) that
* did not reply. respondedAck/respondedDone hold the sender node-ids that
* sent an ACK / NOTIFY_DONE; pending = cluster peers minus that set. The
* timestamps give the elapsed wait. Touched only on the event-loop thread
* (ack/done/timeout handlers), so no extra locking is needed. */
mr_dict* respondedAck;
mr_dict* respondedDone;
long long dispatchMonoMs;
long long lastProgressMonoMs;
};

struct ExecutionCtx {
Expand Down Expand Up @@ -531,9 +543,24 @@ static Execution* MR_ExecutionAlloc() {
e->timeoutTask = NULL;
e->timeoutMS = EXECUTION_DEFAULT_MAX_IDLE_MS;
e->flags = 0;
/* MOD-14615 diagnostics. Node-ids are NUL-terminated strings, so use the
* heap-strings dict type (matches the cluster node table), NOT the fixed
* ID_LEN dict type used for execution ids. */
e->respondedAck = mr_dictCreate(&mr_dictTypeHeapStrings, NULL);
e->respondedDone = mr_dictCreate(&mr_dictTypeHeapStrings, NULL);
e->dispatchMonoMs = 0;
e->lastProgressMonoMs = 0;
return e;
}

/* MOD-14615: monotonic milliseconds for measuring how long an execution waited
* for peer replies. CLOCK_MONOTONIC so it is unaffected by wall-clock changes. */
static long long mr_nowMonoMs(void) {
struct timespec t;
clock_gettime(CLOCK_MONOTONIC, &t);
return (long long)t.tv_sec * 1000LL + t.tv_nsec / 1000000LL;
}

Execution* MR_CreateExecution(ExecutionBuilder* builder, MRError** err) {
if (!MR_IsClusterInitialize()) {
*err = &UINITIALIZED_CLUSTER_ERROR;
Expand Down Expand Up @@ -1016,6 +1043,8 @@ static void MR_NotifyDone(RedisModuleCtx *ctx, const char *sender_id, uint8_t ty
return;
}

if (sender_id) mr_dictAdd(e->respondedDone, (void*)sender_id, NULL); /* MOD-14615 */
e->lastProgressMonoMs = mr_nowMonoMs();
++e->nCompleted;
if (e->nCompleted == MR_ClusterGetSize() - 1) {
/* Execution is finished on all the shards,
Expand Down Expand Up @@ -1077,6 +1106,8 @@ static void MR_AckExecution(RedisModuleCtx *ctx, const char *sender_id, uint8_t
return;
}

if (sender_id) mr_dictAdd(e->respondedAck, (void*)sender_id, NULL); /* MOD-14615 */
e->lastProgressMonoMs = mr_nowMonoMs();
++e->nReceived;
if (e->nReceived == MR_ClusterGetSize() - 1) {
/* all shards have received the execution, we can invoke it. */
Expand Down Expand Up @@ -1293,6 +1324,12 @@ static void MR_ExecutionDistribute(Execution* e, void* pd) {
MR_ClusterSendMsg(NULL, fid, buff.buff, buff.size);

/* now we wait for shards to respond that they got the execution */
/* MOD-14615: stamp dispatch so a max-idle timeout can report the elapsed
* wait. Debug-level (per-execution, would be noisy at notice). */
e->dispatchMonoMs = e->lastProgressMonoMs = mr_nowMonoMs();
RedisModule_Log(mr_staticCtx, "debug",
"MR execution %s dispatched to %zu peers (timeoutMS=%zu)",
e->idStr, MR_ClusterGetSize() - 1, e->timeoutMS);
}

static void MR_ExecutionTimedOutInternal(Execution* e, void* pd) {
Expand All @@ -1310,6 +1347,29 @@ static void MR_ExecutionTimedOut(void* ctx) {
/* execution timed out */
e->timeoutTask = NULL;
++mrCtx.stats.nMaxIdleReached;

/* MOD-14615: name the shard(s) that did not reply before we discard the
* execution. We are stalled in the ACK phase if not all shards acknowledged
* receipt, otherwise in the DONE phase; pick the matching responded set so
* `pending` lists exactly the non-responders. Warning level: this coincides
* with the user-visible multi-shard failure and is rate-bounded by
* nMaxIdleReached, and warning IS captured at the default loglevel. */
{
size_t expected = MR_ClusterGetSize() - 1;
long long now = mr_nowMonoMs();
int ackPhase = (e->nReceived < expected);
mr_dict* got = ackPhase ? e->respondedAck : e->respondedDone;
size_t nGot = ackPhase ? e->nReceived : e->nCompleted;
char pending[1024];
MR_ClusterFormatPendingPeers(got, pending, sizeof(pending));
RedisModule_Log(mr_staticCtx, "warning",
"MR execution %s max-idle reached: phase=%s replies=%zu/%zu waited=%lldms "
"sinceLastProgress=%lldms pending=[%s]",
e->idStr, ackPhase ? "ack" : "done", nGot, expected,
e->dispatchMonoMs ? now - e->dispatchMonoMs : -1,
e->lastProgressMonoMs ? now - e->lastProgressMonoMs : -1, pending);
}

/* Delete the execution from the executions dictionary,
* We will ignore further messages on this execution. */
mr_dictDelete(mrCtx.executionsDict, e->id);
Expand Down Expand Up @@ -1408,6 +1468,126 @@ void MR_ExecutionSetMaxIdle(Execution* e, size_t maxIdle) {
e->timeoutMS = maxIdle;
}

/* ---- MOD-15307: drain LibMR background threads to a safe point before fork() ----
*
* A LibMR thread (a worker in the execution pool, or the event-loop thread) that holds
* a libc lock (e.g. the malloc arena lock) at the instant redis calls fork() leaves the
* child holding a locked mutex with no owner -> the child ghost-locks the first time it
* mallocs (RDB save, slot-migration snapshot, etc.). That is the root trigger behind the
* ASM-migration nightly hangs (MOD-15307) and, downstream, the multi-shard query max-idle
* timeout (MOD-14615).
*
* MR_DrainForFork() (called on the main thread from the FORK_CHILD_PRE module event) brings
* both kinds of LibMR thread to a safe, lock-free point:
* - the event-loop thread is parked *between tasks* via a posted task (so it is not
* mid-message-deserialize / mid-malloc, and is not holding the module GIL);
* - with the event-loop thread parked no new executions are dispatched, so the worker
* pool drains to idle.
* Both waits are bounded; on timeout we fork anyway (fail-open -> never worse than today).
* Note a worker still blocked acquiring the module GIL (held by us) is itself malloc-safe,
* so a timeout there is benign. MR_ResumeAfterFork() releases the parked event-loop thread
* and must be called after fork() (FORK_CHILD_BORN) or if the fork was cancelled. */
#define MR_FORK_DRAIN_TIMEOUT_MS 2000

static pthread_mutex_t mr_forkDrainLock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t mr_forkDrainCond = PTHREAD_COND_INITIALIZER;
static int mr_forkElParked = 0; /* set by the park task once the el-thread is quiesced */
static int mr_forkElRelease = 0; /* set by MR_ResumeAfterFork to release the el-thread */

/* Runs on the event-loop thread, between tasks (a safe point). Parks the thread until
* MR_ResumeAfterFork() is called. */
static void MR_ForkParkElThread(void* ctx) {
REDISMODULE_NOT_USED(ctx);
pthread_mutex_lock(&mr_forkDrainLock);
mr_forkElParked = 1;
pthread_cond_broadcast(&mr_forkDrainCond);
while (!mr_forkElRelease) {
pthread_cond_wait(&mr_forkDrainCond, &mr_forkDrainLock);
}
mr_forkElParked = 0;
pthread_mutex_unlock(&mr_forkDrainLock);
}

static int mr_forkDeadlinePassed(const struct timespec* deadline) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
return (now.tv_sec > deadline->tv_sec) ||
(now.tv_sec == deadline->tv_sec && now.tv_nsec >= deadline->tv_nsec);
}

void MR_DrainForFork(void) {
/* Nothing to drain if the pool / event loop were never started. */
if (!mrCtx.executionsThreadPool) return;

/* Precise drain-only timing: measures just the quiesce (park + worker wait),
* excluding the fork() that follows. */
struct timespec _drain_t0;
clock_gettime(CLOCK_MONOTONIC, &_drain_t0);

struct timespec deadline;
clock_gettime(CLOCK_REALTIME, &deadline);
deadline.tv_sec += MR_FORK_DRAIN_TIMEOUT_MS / 1000;
deadline.tv_nsec += (MR_FORK_DRAIN_TIMEOUT_MS % 1000) * 1000000L;
if (deadline.tv_nsec >= 1000000000L) { deadline.tv_sec++; deadline.tv_nsec -= 1000000000L; }

/* 1) Park the event-loop thread at a between-tasks safe point. */
pthread_mutex_lock(&mr_forkDrainLock);
mr_forkElParked = 0;
mr_forkElRelease = 0;
pthread_mutex_unlock(&mr_forkDrainLock);

MR_EventLoopAddTask(MR_ForkParkElThread, NULL);

pthread_mutex_lock(&mr_forkDrainLock);
while (!mr_forkElParked) {
if (pthread_cond_timedwait(&mr_forkDrainCond, &mr_forkDrainLock, &deadline) == ETIMEDOUT)
break;
}
int parked = mr_forkElParked;
pthread_mutex_unlock(&mr_forkDrainLock);

if (!parked) {
RedisModule_Log(mr_staticCtx, "warning",
"MR_DrainForFork: event-loop thread did not park within %dms; forking anyway",
MR_FORK_DRAIN_TIMEOUT_MS);
}

/* 2) Wait (bounded) for the worker pool to go fully idle -- both in-flight jobs AND jobs
* still queued but not yet picked up (a queued job would otherwise start running, and
* allocate, right after the fork). With the event loop parked no new work is dispatched,
* so the pool drains to empty unless a worker is blocked acquiring the GIL we hold -- a
* malloc-safe state, so a timeout there is benign. */
while (mr_thpool_num_threads_working(mrCtx.executionsThreadPool) > 0 ||
mr_thpool_num_jobs_in_queue(mrCtx.executionsThreadPool) > 0) {
if (mr_forkDeadlinePassed(&deadline)) {
RedisModule_Log(mr_staticCtx, "warning",
"MR_DrainForFork: pool not idle at timeout (busy=%d, queued=%d); forking anyway",
mr_thpool_num_threads_working(mrCtx.executionsThreadPool),
mr_thpool_num_jobs_in_queue(mrCtx.executionsThreadPool));
break;
}
struct timespec nap = { .tv_sec = 0, .tv_nsec = 1000000L }; /* 1ms */
nanosleep(&nap, NULL);
}
Comment thread
cursor[bot] marked this conversation as resolved.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared fork drain timeout

Medium Severity

MR_DrainForFork uses one CLOCK_REALTIME deadline for both parking the event-loop thread and waiting for the worker pool. Time spent waiting for the park task (including a long event-loop backlog before it runs) is subtracted from the same budget, so phase two often gets little or no time even after a successful park, and after a park timeout the pool loop exits immediately because the deadline is already past.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit f6fe687. Configure here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is intentional. MR_DrainForFork runs synchronously on redis's main thread before every fork, so the single MR_FORK_DRAIN_TIMEOUT_MS (2s) deadline deliberately bounds the total main-thread stall — we do not want a fork (RDB/AOF/ASM snapshot) to block the server longer than that.

In the normal case the park task runs in microseconds, so phase two effectively gets the full budget (measured median drain ~62us). If the park itself cannot complete within the budget, the event-loop thread is stuck off a safe point and the correct bounded fallback is to fork anyway — the drain is best-effort, not a hard barrier. Giving phase two its own fresh 2s budget would double the worst-case main-thread stall, which we deliberately avoid.


struct timespec _drain_t1;
clock_gettime(CLOCK_MONOTONIC, &_drain_t1);
long long _drain_us = (long long)(_drain_t1.tv_sec - _drain_t0.tv_sec) * 1000000LL +
(_drain_t1.tv_nsec - _drain_t0.tv_nsec) / 1000LL;
RedisModule_Log(mr_staticCtx, "debug",
"MR_DrainForFork drained in %lld us (el_parked=%d, busy_workers=%d, queued=%d)",
_drain_us, parked, mr_thpool_num_threads_working(mrCtx.executionsThreadPool),
mr_thpool_num_jobs_in_queue(mrCtx.executionsThreadPool));
}

void MR_ResumeAfterFork(void) {
if (!mrCtx.executionsThreadPool) return;
pthread_mutex_lock(&mr_forkDrainLock);
mr_forkElRelease = 1;
pthread_cond_broadcast(&mr_forkDrainCond);
pthread_mutex_unlock(&mr_forkDrainLock);
}

void MR_Run(Execution* e) {
/* take ownership on the execution */
__atomic_add_fetch(&e->refCount, 1, __ATOMIC_RELAXED);
Expand Down Expand Up @@ -1523,6 +1703,8 @@ void MR_FreeExecution(Execution* e) {
MR_RecordFree(e->errors[i]);
}
array_free(e->errors);
if (e->respondedAck) mr_dictRelease(e->respondedAck); /* MOD-14615 */
if (e->respondedDone) mr_dictRelease(e->respondedDone); /* MOD-14615 */
MR_FREE(e);
}

Expand Down
8 changes: 8 additions & 0 deletions src/mr.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ LIBMR_API bool MR_IsInternalCommandsExecution(const Execution* e);
/* Free the given execution */
LIBMR_API void MR_FreeExecution(Execution* e);

/* MOD-15307: drain LibMR background threads (worker pool + event-loop thread) to a safe,
* lock-free point before fork(), so the forked child does not inherit a libc lock held by a
* LibMR thread (ghost-lock). Bounded (a few seconds), then proceeds anyway (fail-open). Call
* on the main thread from the FORK_CHILD_PRE module event; pair every call with exactly one
* MR_ResumeAfterFork() (on FORK_CHILD_BORN, or if the fork was cancelled). */
LIBMR_API void MR_DrainForFork(void);
LIBMR_API void MR_ResumeAfterFork(void);

/* Initialize mr library */
LIBMR_API int MR_Init(struct RedisModuleCtx* ctx, size_t numThreads, char *password);

Expand Down
9 changes: 9 additions & 0 deletions src/utils/thpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ int mr_thpool_num_threads_working(mr_thpool_* thpool_p) {
return thpool_p->num_threads_working;
}

int mr_thpool_num_jobs_in_queue(mr_thpool_* thpool_p) {
/* len is written under rwmutex (jobqueue_push/pull); read it under the same
* lock to avoid a data race. */
pthread_mutex_lock(&thpool_p->jobqueue.rwmutex);
int len = thpool_p->jobqueue.len;
pthread_mutex_unlock(&thpool_p->jobqueue.rwmutex);
return len;
}
Comment thread
cursor[bot] marked this conversation as resolved.

/* ============================ THREAD ============================== */

/* Initialize a thread in the thread pool
Expand Down
8 changes: 8 additions & 0 deletions src/utils/thpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ void mr_thpool_destroy(mr_threadpool);
*/
int mr_thpool_num_threads_working(mr_threadpool);

/**
* @brief Number of jobs queued but not yet picked up by a worker.
*
* @param threadpool the threadpool of interest
* @return integer number of queued jobs
*/
int mr_thpool_num_jobs_in_queue(mr_threadpool);


#ifdef __cplusplus
}
Expand Down
Loading