Skip to content

bthread_stop函数中bthread_interrupt的调度问题 #3349

@lghost1999

Description

@lghost1999

Describe the bug
bthread_stop函数作用是向一个正在运行的 bthread 发送停止请求,将其标记为不可再执行状态,并要求它尽快终止,函数内部会调用bthread_interrupt,唤醒可能正在阻塞的 bthread ,目前存在会错误调度到默认tagged group的行为。具体调用链路:

1层:bthread_stop → bthread_interrupt
// bthread.cpp:375-378
int bthread_stop(bthread_t tid) {
    bthread::TaskGroup::set_stopped(tid);  //设置协程元数据taskMeta stop标志位为true
    return bthread_interrupt(tid);         //中断正在阻塞的协程
}

第2层:bthread_interrupt 使用默认参数
// bthread.h:90
extern int bthread_interrupt(bthread_t tid, bthread_tag_t tag = BTHREAD_TAG_DEFAULT);
// types.h:39
static const bthread_tag_t BTHREAD_TAG_DEFAULT = 0;
//bthread_interrupt(tid) 等价于 bthread_interrupt(tid, 0)3层:bthread_interrupt → TaskGroup::interrupt
// bthread.cpp:371-373
int bthread_interrupt(bthread_t tid, bthread_tag_t tag) {
    return bthread::TaskGroup::interrupt(tid, bthread::get_task_control(), tag);
}

第4层:TaskGroup::interrupt 中 tag=0 被用于选择池子
// task_group.cpp:1092-1126
int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) {
    // Consume current_waiter in the TaskMeta, wake it up then set it back.
    ButexWaiter* w = NULL;
    uint64_t sleep_id = 0;
    int rc = interrupt_and_consume_waiters(tid, &w, &sleep_id);  //设置协程 interrupt标志位为true,获取目当前正在等待的对象(butex或者定时器id)
    if (rc) {
        return rc;
    }
    // a bthread cannot wait on a butex and be sleepy at the same time.
    CHECK(!sleep_id || !w);
    if (w != NULL) {
        erase_from_butex_because_of_interruption(w);
        // If butex_wait() already wakes up before we set current_waiter back,
        // the function will spin until current_waiter becomes non-NULL.
        rc = set_butex_waiter(tid, w);
        if (rc) {
            LOG(FATAL) << "butex_wait should spin until setting back waiter";
            return rc;
        }
    } else if (sleep_id != 0) {         //协程在 bthread_usleep 睡眠
        if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
            TaskGroup* g = tls_task_group;
            TaskMeta* m = address_meta(tid);
            if (g) {
                g->ready_to_run(m);
            } else {
                if (!c) {     //当前是非 worker 线程
                    return EINVAL;
                }
                c->choose_one_group(tag)->ready_to_run_remote(m);   //放到tag=0的worker的远程队列中运行
            }
        }
    }
    return 0;
}

第5层:choose_one_group(0) 选择tag=0的TaskGroup
// task_control.cpp:342-351
TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) {
    CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags);
    auto& groups = tag_group(tag);
    const auto ngroup = tag_ngroup(tag).load(butil::memory_order_acquire);
    if (ngroup != 0) {
        return groups[butil::fast_rand_less_than(ngroup)];
    }
    CHECK(false) << "Impossible: ngroup is 0";
    return NULL;
}

这个bug触发条件是非worker线程调用bthread_stop(),并且目标bthread正在sleep的时候,会将其放入tag=0的某个worker的远程队列中去执行。

在框架内部,在brpc server被析构的时候,_derivative_threadsaved_close_idle_tid会被stop,这两个bthread都是通过bthread_usleep定期去执行逻辑,同时server的析构一般会在非worker线程,通常是主线程中执行,这就会命中TaskGroup::interrupt里的choose_one_group(tag)这个分支,这两个bthread会调度给到tag=0的worker去执行,具体调用链路:

第一层:server.cpp 449-499
Server::~Server() {
    Stop(0);
    Join();
    ......
}

第二层:server.cpp 1316-1364
int Server::Join() {
    if (_status != RUNNING && _status != STOPPING) {
        return -1;
    }
    if (_am) {
        _am->Join();
    }
    ......
    // Have to join _derivative_thread, which may assume that server is running
    // and services in server are not mutated, otherwise data race happens
    // between Add/RemoveService after Join() and the thread.
    if (_derivative_thread != INVALID_BTHREAD) {
        bthread_stop(_derivative_thread);
        bthread_join(_derivative_thread, NULL);
        _derivative_thread = INVALID_BTHREAD;
    }

    g_running_server_count.fetch_sub(1, butil::memory_order_relaxed);
    _status = READY;
    return 0;
}

第3层:acceptor.cpp 176-200
void Acceptor::Join() {
    std::unique_lock<butil::Mutex> mu(_map_mutex);
    if (_status != STOPPING && _status != RUNNING) {  // no need to join.
        return;
    }
    // `_listened_fd' will be set to -1 once it has been recycled
    while (_listened_fd > 0 || !_socket_map.empty()) {
        _empty_cond.Wait();
    }
    const int saved_idle_timeout_sec = _idle_timeout_sec;
    _idle_timeout_sec = 0;
    const bthread_t saved_close_idle_tid = _close_idle_tid;
    mu.unlock();

    // Join the bthread outside lock.
    if (saved_idle_timeout_sec > 0) {
        bthread_stop(saved_close_idle_tid);
        bthread_join(saved_close_idle_tid, NULL);
    }
    
    {
        BAIDU_SCOPED_LOCK(_map_mutex);
        _status = READY;
    }
}

Expected behavior

没有引入tagged group之前,TaskGroup::interrupt的行为是c->choose_one_group(),不包含参数tag,随机交给一个worker的远程队列等待执行就可以,引入tagged group之后,这里预期的行为应该是交给和被打断的bthread的tag相同的worker去执行,保证隔离性。
可以在bthread_stop内部调用bthread_interrupt之前,获取bthread的tag,作为参数传入

// bthread.cpp 375-378
int bthread_stop(bthread_t tid) {
    bthread::TaskGroup::set_stopped(tid);
    return bthread_interrupt(tid);
}

//修改后
int bthread_stop(bthread_t tid) {
    bthread::TaskGroup::set_stopped(tid);
    bthread_attr_t attr;
    bthread_tag_t tag = BTHREAD_TAG_DEFAULT;
    if (bthread::TaskGroup::get_attr(tid, &attr) == 0 && attr.tag != BTHREAD_TAG_INVALID) {
        tag = attr.tag;
    }
    return bthread_interrupt(tid, tag);
}

@yanglimingcn 请教一下大佬,这里bthread_interrupt中没有传入tag,而是使用默认参数的方式是故意这么设计的吗,还是应该这里要传入tag,保证bthread会被调用到相同tag的worker中去执行?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions