Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/mr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,24 @@ int MR_Init(RedisModuleCtx* ctx, size_t numThreads, char *password) {
return REDISMODULE_OK;
}

int MR_ResizeExecutionThreadPoolIfUnstarted(size_t numThreads) {
mr_threadpool tp = mrCtx.executionsThreadPool;

/* MR_Init() not called yet. */
if (!tp) {
return REDISMODULE_OK;
}
Comment thread
cursor[bot] marked this conversation as resolved.

if (mr_thpool_workers_started(tp)) {
return REDISMODULE_ERR;
}

if (mr_thpool_resize_unstarted(tp, numThreads) != 0) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implicit size_t-to-int narrowing truncates thread count

Medium Severity

MR_ResizeExecutionThreadPoolIfUnstarted accepts size_t numThreads but passes it directly to mr_thpool_resize_unstarted, which takes int num_threads. On 64-bit systems, values exceeding INT_MAX are silently truncated. The <= 0 guard inside the callee catches some overflow cases but not all — a size_t like (1UL << 32) + 5 truncates to 5, passing validation and allocating the wrong number of thread slots.

Additional Locations (1)
Fix in Cursor Fix in Web

return REDISMODULE_ERR;
}
return REDISMODULE_OK;
}

int MR_RegisterObject(MRObjectType* t) {
mrCtx.objectTypesDict = array_append(mrCtx.objectTypesDict, t);
t->id = array_len(mrCtx.objectTypesDict) - 1;
Expand Down
3 changes: 3 additions & 0 deletions src/mr.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ LIBMR_API void MR_FreeExecution(Execution* e);
/* Initialize mr library */
LIBMR_API int MR_Init(struct RedisModuleCtx* ctx, size_t numThreads, char *password);

/* Resize the execution thread pool with a new size if worker threads were never started. */
LIBMR_API int MR_ResizeExecutionThreadPoolIfUnstarted(size_t numThreads);

/* Register a new object type */
LIBMR_API int MR_RegisterObject(MRObjectType* t);

Expand Down
41 changes: 39 additions & 2 deletions src/utils/thpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ struct mr_thpool_* mr_thpool_init(int num_threads) {
threads_on_hold = 0;
threads_keepalive = 1;

if (num_threads < 0) {
num_threads = 0;
if (num_threads <= 0) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the old code allowed 0? Was it "unlimited" maybe?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think so. When it was 0, its just kept allocating 0 bytes for the threads, which is 0 threads(and is there a reason we should allow it)?

err("thpool_init(): num_threads must be greater than 0\n");
return NULL;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mr_thpool_init returning NULL for zero is unchecked

Medium Severity

mr_thpool_init now returns NULL when num_threads is 0, but its only caller MR_Init does not check the return value and unconditionally returns REDISMODULE_OK. Previously, passing 0 created a valid (though empty) pool object. Now, mrCtx.executionsThreadPool is left NULL, and the first call to mr_thpool_add_work will dereference it in mr_thpool_start_threads, causing a NULL pointer crash.

Additional Locations (1)
Fix in Cursor Fix in Web

}

/* Make new thread pool */
Expand Down Expand Up @@ -180,6 +181,42 @@ struct mr_thpool_* mr_thpool_init(int num_threads) {
return thpool_p;
}

int mr_thpool_workers_started(mr_threadpool thpool) {
return thpool != NULL && thpool->is_threads_started;
Comment thread
AvivDavid23 marked this conversation as resolved.
Outdated
}

int mr_thpool_resize_unstarted(mr_threadpool thpool, int num_threads) {

if (thpool == NULL) {
return -1;
}

if (num_threads <= 0) {
err("mr_thpool_resize_unstarted(): num_threads must be greater than 0\n");
return -1;
}

pthread_mutex_lock(&thpool->is_threads_started_lock);
if (thpool->is_threads_started) {
pthread_mutex_unlock(&thpool->is_threads_started_lock);
err("mr_thpool_resize_unstarted(): workers already started\n");
return -1;
}

mr_thread** new_pool =
(mr_thread**)MR_REALLOC(thpool->threads, (size_t)num_threads * sizeof(struct mr_thread*));
if (new_pool == NULL) {
pthread_mutex_unlock(&thpool->is_threads_started_lock);
err("mr_thpool_resize_unstarted(): realloc failed\n");
return -1;
}

thpool->threads = new_pool;
thpool->total_num_of_threads = num_threads;
pthread_mutex_unlock(&thpool->is_threads_started_lock);
return 0;
}
Comment thread
AvivDavid23 marked this conversation as resolved.

/* Add work to the thread pool */
int mr_thpool_add_work(mr_thpool_* thpool_p, void (*function_p)(void*), void* arg_p) {
mr_job* newjob;
Expand Down
9 changes: 9 additions & 0 deletions src/utils/thpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ typedef struct mr_thpool_* mr_threadpool;
*/
mr_threadpool mr_thpool_init(int num_threads);

/** @return Non-zero if worker pthreads have been started, 0 otherwise. */
int mr_thpool_workers_started(mr_threadpool thpool);

/**
* Resize an unstarted pool: realloc the thread pointer table and update
* total_num_of_threads; job queue and sync primitives are unchanged.
* @return 0 on success, -1 on failure (pool unchanged on failure).
*/
int mr_thpool_resize_unstarted(mr_threadpool thpool, int num_threads);

/**
* @brief Add work to the job queue
Expand Down
Loading