源码分析:https://github.com/moyin1004/brpc-learn

bthread结构

btreahd数据结构

  • bthread可以理解为用户态线程,调度无需系统调用
  • TaskMeta实际由brpc中对象池ReourcePool分配
  • 实际上每个bthread都是一个任务,bthread的调度就是对任务的调度,如果了解golang的GMP模型,就会发现两者非常相似,都是任务+任务队列的形式,bthread或者goroutine的和普通任务的区别就是可以中途挂起

bthread对应的数据结构如下:

struct TaskMeta {
    // [Not Reset]
    butil::atomic<ButexWaiter*> current_waiter{NULL};
    uint64_t current_sleep{TimerThread::INVALID_TASK_ID};

    // A flag to mark if the Timer scheduling failed.
    bool sleep_failed{false};

    // A builtin flag to mark if the thread is stopping.
    bool stop{false};

    // The thread is interrupted and should wake up from some blocking ops.
    bool interrupted{false};

    // Scheduling of the thread can be delayed.
    bool about_to_quit{false};
    
    // [Not Reset] guarantee visibility of version_butex.
    pthread_spinlock_t version_lock{};
    
    // [Not Reset] only modified by one bthread at any time, no need to be atomic
    uint32_t* version_butex{NULL};

    // The identifier. It does not have to be here, however many code is
    // simplified if they can get tid from TaskMeta.
    bthread_t tid{INVALID_BTHREAD};

    // User function and argument
    void* (*fn)(void*){NULL};
    void* arg{NULL};

    // Stack of this task.
    ContextualStack* stack{NULL};

    // Attributes creating this task
    bthread_attr_t attr{BTHREAD_ATTR_NORMAL};
    
    // Statistics
    int64_t cpuwide_start_ns{0};
    TaskStatistics stat{};

    // bthread local storage, sync with tls_bls (defined in task_group.cpp)
    // when the bthread is created or destroyed.
    // DO NOT use this field directly, use tls_bls instead.
    LocalStorage local_storage{};

    // Only used when TaskTracer is enabled.
    // Bthread status.
    TaskStatus status{TASK_STATUS_UNKNOWN};
    // Whether bthread is traced?
    bool traced{false};
    // Worker thread id.
    pid_t worker_tid{-1};
}

其中比较重要的是要执行的函数任务以及参数 void* (*fn)(void*){NULL};void* arg{NULL}; 以及栈 ContextualStack* stack{NULL}; 用于存储上下文

bthread栈结构如下:

  • 其中context由bthread_make_fcontext函数创建,使用汇编代码实现
  • 当bthread恢复执行时,会从ContextualStack中恢复寄存器及函数栈状态
struct StackStorage {
    int stacksize;
    int guardsize;
    void* bottom; // 栈底部指针,使用mmap分配内存
    unsigned valgrind_stack_id;
};

struct ContextualStack {
    bthread_fcontext_t context;
    StackType stacktype;
    StackStorage storage;
};

bthread属性bthread_attr_t,都为int类型4B,可以最多32个标记

  • 其中stack_type分为:
    • 等于原生pthread
    • 小:默认32768B,32KB
    • 标准:默认1048576B,1MB,epoll bthread使用了此大小
    • 大:8MB
  • flag使用了位比较,可以同时设置多个flag
    • BTHREAD_LOG_START_AND_FINISH:在bthread启动和结束时打印日志
    • BTHREAD_LOG_CONTEXT_SWITCH:bthread切换时打印日志
    • BTHREAD_NOSIGNAL:用于把相同的批处理任务放到同一组,bthread_flush唤醒这组的暂停任务
    • BTHREAD_NEVER_QUIT:标记不会退出的bthread,例如epoll bthread
    • BTHREAD_INHERIT_SPAN:暂不清楚作用
typedef struct bthread_attr_t {
    bthread_stacktype_t stack_type;
    bthread_attrflags_t flags;
    bthread_keytable_pool_t* keytable_pool;
    // 用于bthread分组
    bthread_tag_t tag;
} bthread_attr_t;

bthread api

  • brpc中bthread基本1:1实现了linux pthread的api,包括对应用户态锁和条件变量
brpc bthread api linux pthread api
bthread_start_background pthread_create
bthread_join pthread_join
bthread_usleep usleep
bthread_yield sched_yield
bthread_exit pthread_exit
bthread_mutex_init pthread_mutex_init
bthread_mutex_lock pthread_mutex_lock
bthread_cond_init pthread_cond_init

TaskControl和TaskGroup

TaskControl是bthread的调度器,用于管理bthread和bthread任务组TaskGroup

  • brpc会使用线程池执行bthread任务,线程池数量可以通过参数指定,默认是9个线程,操作系统线程的入口函数是void* TaskControl::worker_thread(void* arg)
// 全局变量
TaskControl* g_task_control = NULL;
// 创建TaskControl 加互斥锁保证唯一性
inline TaskControl* get_or_new_task_control()

其中g_task_control实际是一个原子变量butil::atomic<TaskControl*>*,由于全局变量初始化顺序问题,声明为普通指针

TaskGroup是bthread任务组,拥有两个任务队列,一个本地队列,一个远程队列

  • 每个线程都有自己的TaskGroup实例,用于管理bthread任务,可以通过extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; 获取
class TaskGroup {
private:
    WorkStealingQueue<bthread_t> _rq; // 本地队列,本worker线程启动的bthread任务会直接放到这里
    RemoteTaskQueue _remote_rq;       // 远程队列,用于存放非worker线程分配过来的bthread任务
};

bthread调度流程

  • bthread_start_background时,如果存在本线程的task_group且没有指定属性tag,则直接推送到本地队列_rq,其他情况一律_remote_rq(有锁队列)
  • 通过源码可以发现task_runner就是bthread执行函数,bthread在sched_to函数中会切换到需要运行的bthread,在task_runner运行结束后会恢复到主bthread TaskGroup::sched_to(&dummy, tid); 这一行继续执行,所以if语句中的代码永远不会执行
void TaskGroup::run_main_task() {
    bvar::PassiveStatus<double> cumulated_cputime(
        get_cumulated_cputime_from_this, this);
    std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
    LOG(INFO) << "TaskGroup::run_main_task:" << this->_tid;

    TaskGroup* dummy = this;
    bthread_t tid;
    // 从任务队列中取出bthread
    while (wait_task(&tid)) {
        LOG(INFO) << "cur_id:" << _cur_meta->tid << " switch to bthread:" << tid << " main_id:" << _main_tid;
        TaskGroup::sched_to(&dummy, tid);
        LOG(INFO) << "sched_to end";
        DCHECK_EQ(this, dummy);
        DCHECK_EQ(_cur_meta->stack, _main_stack);
        if (_cur_meta->tid != _main_tid) {
            // 这里不会走到
            LOG(INFO) << "task_runner start";
            TaskGroup::task_runner(1/*skip remained*/);
            LOG(INFO) << "task_runner end";
        }
        LOG(INFO) << "cur_id:" << _cur_meta->tid << " targetid:" << tid << " main_id:" << _main_tid;
    }
    // Don't forget to add elapse of last wait_task.
    current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}

void TaskGroup::task_runner(intptr_t skip_remained) {
    TaskGroup* g = tls_task_group;
    if (!skip_remained) {
        // 切换bthread之后,把之前未执行完的bthead放入执行队列
        while (g->_last_context_remained) {
            LOG(INFO) << "taskgroup task_runner run _last_context_remained";
            RemainedFn fn = g->_last_context_remained;
            g->_last_context_remained = NULL;
            fn(g->_last_context_remained_arg);
            g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
        }
    }
    do {
        TaskMeta* const m = g->_cur_meta;
        void* thread_return;
        try {
            thread_return = m->fn(m->arg);
        } catch (ExitException& e) {
            thread_return = e.value();
        }
        g->_control->_nbthreads << -1;
        g->_control->tag_nbthreads(g->tag()) << -1;
        g->set_remained(TaskGroup::_release_last_context, m);
        ending_sched(&g); // 这里会切换回主bthread即main_tid
    } while (g->_cur_meta->tid != g->_main_tid);
}

bthread执行流程

参考资料