Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,10 @@ static int MR_TlsPasswordCallback(char *buf, int size, int rwflag, void *u) {
}

SSL_CTX* MR_CreateSSLContext(const char *cacert_filename,
const char *cert_filename,
const char *private_key_filename,
const char *private_key_pass,
redisSSLContextError *error)
const char *cert_filename,
const char *private_key_filename,
const char *private_key_pass,
redisSSLContextError *error)
{
SSL_CTX *ssl_ctx = SSL_CTX_new(SSLv23_client_method());
if (!ssl_ctx) {
Expand Down Expand Up @@ -748,6 +748,8 @@ static void MR_NodeFree(Node* n){
}

static void MR_ClusterFree(){
MR_AbortRunningExecutions();

if(clusterCtx.CurrCluster->myId){
MR_FREE(clusterCtx.CurrCluster->myId);
}
Expand Down
41 changes: 38 additions & 3 deletions src/mr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,15 @@ static void MR_ExecutionTimedOut(void* ctx) {
MR_ExecutionAddTask(e, MR_ExecutionTimedOutInternal, NULL);
}

// Thread pool task: fires the done callback with a cluster topology change error
static void MR_ExecutionAbortedOnClusterChange(Execution* e, void* pd) {
e->errors = array_append(e->errors, MR_ErrorRecordCreate("cluster topology changed"));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we using new string ?
shoudent we use the exsiting SLOT_RANGES_ERROR = "Query requires unavailable slots" string ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will use it in ts. It is different than the slot-ranges error and from the timeout error, so deserves a new description.

if (!MR_ExecutionInvokeCallback(e, &e->callbacks.done))
return;
e->callbacks.done.callback = NULL; /* make sure the done callback will not be called again */
MR_FreeExecution(e);
}

static void MR_ExecutionMain(void* pd) {
Execution* e = pd;
pthread_mutex_lock(&e->eLock);
Expand All @@ -1317,9 +1326,10 @@ static void MR_ExecutionMain(void* pd) {

ExecutionTaskCallback callback = task->callback;
callback(e, task->pd);
if (callback == MR_DisposeExecution || callback == MR_ExecutionTimedOutInternal) {
/* MR_DisposeExecution means we will not longer gets any events
* on this execution and we should not longer touch it. */
if (callback == MR_DisposeExecution ||
callback == MR_ExecutionTimedOutInternal ||
callback == MR_ExecutionAbortedOnClusterChange) {
// These callbacks dispose the execution; we must not touch it afterwards
return;
}

Expand Down Expand Up @@ -1426,6 +1436,31 @@ LIBMR_API void MR_ExecutionCtxSetError(ExecutionCtx* ectx, const char* err, size
ectx->err = MR_ErrorRecordCreate(error);
}

LIBMR_API void MR_AbortRunningExecutions(void) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MR_AbortRunningExecutions runs on the event loop and only queues MR_ExecutionAbortedOnClusterChange on the thread pool. MR_ClusterFree then proceeds synchronously and frees nodes/cluster state while worker threads may still run the user onDone callback afterward.

That’s the same async pattern as idle timeout, but it means onDone can run when the cluster is already torn down or partially rebuilt. Worth a short comment near MR_AbortRunningExecutions or MR_ClusterFree so callers know not to assume cluster topology is stable inside onDone for this error path (unless we later add a stronger synchronization barrier).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

// Collect pointers first to avoid mutating the dict while iterating
ARR(Execution*) to_abort = array_new(Execution*, 10);
mr_dictIterator* iter = mr_dictGetIterator(mrCtx.executionsDict);
mr_dictEntry* entry;
while ((entry = mr_dictNext(iter))) {
Execution* e = mr_dictGetVal(entry);
if (e->flags & ExecutionFlag_Initiator)
to_abort = array_append(to_abort, e);
}
mr_dictReleaseIterator(iter);

for (size_t i = 0; i < array_len(to_abort); ++i) {
Execution* e = to_abort[i];
// Cancel the pending idle-timeout timer if it is already armed
if (e->timeoutTask) {
MR_EventLoopDelayTaskCancel(e->timeoutTask);
e->timeoutTask = NULL;
}
mr_dictDelete(mrCtx.executionsDict, e->id);
MR_ExecutionAddTask(e, MR_ExecutionAbortedOnClusterChange, NULL);
}
array_free(to_abort);
}

static void MR_StepDispose(Step* s) {
if (s->bStep.name) {
MR_FREE(s->bStep.name);
Expand Down
2 changes: 2 additions & 0 deletions src/mr.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ LIBMR_API const char* MR_ExecutionCtxGetError(ExecutionCtx* ectx, size_t i);
LIBMR_API size_t MR_ExecutionCtxGetErrorsLen(ExecutionCtx* ectx);
LIBMR_API void MR_ExecutionCtxSetError(ExecutionCtx* ectx, const char* err, size_t len);

LIBMR_API void MR_AbortRunningExecutions(void);

/* Execution Callback definition */
typedef void(*ExecutionCallback)(ExecutionCtx* ectx, void* pd);

Expand Down
Loading