brpc源码之bthread原理及调度
源码分析:https://github.com/moyin1004/brpc-learn
bthread结构
btreahd数据结构
- bthread可以理解为用户态线程,调度无需系统调用
- TaskMeta实际由brpc中对象池ReourcePool分配
- 实际上每个bthread都是一个任务,bthread的调度就是对任务的调度,如果了解golang的GMP模型,就会发现两者非常相似,都是任务+任务队列的形式,bthread或者goroutine的和普通任务的区别就是可以中途挂起
bthread对应的数据结构如下:
struct TaskMeta {
// [Not Reset]
butil::atomic<ButexWaiter*> current_waiter{NULL};
uint64_t current_sleep{TimerThread::INVALID_TASK_ID};
// A flag to mark if the Timer scheduling failed.
bool sleep_failed{false};
// A builtin flag to mark if the thread is stopping.
bool stop{false};
// The thread is interrupted and should wake up from some blocking ops.
bool interrupted{false};
// Scheduling of the thread can be delayed.
bool about_to_quit{false};
// [Not Reset] guarantee visibility of version_butex.
pthread_spinlock_t version_lock{};
// [Not Reset] only modified by one bthread at any time, no need to be atomic
uint32_t* version_butex{NULL};
// The identifier. It does not have to be here, however many code is
// simplified if they can get tid from TaskMeta.
bthread_t tid{INVALID_BTHREAD};
// User function and argument
void* (*fn)(void*){NULL};
void* arg{NULL};
// Stack of this task.
ContextualStack* stack{NULL};
// Attributes creating this task
bthread_attr_t attr{BTHREAD_ATTR_NORMAL};
// Statistics
int64_t cpuwide_start_ns{0};
TaskStatistics stat{};
// bthread local storage, sync with tls_bls (defined in task_group.cpp)
// when the bthread is created or destroyed.
// DO NOT use this field directly, use tls_bls instead.
LocalStorage local_storage{};
// Only used when TaskTracer is enabled.
// Bthread status.
TaskStatus status{TASK_STATUS_UNKNOWN};
// Whether bthread is traced?
bool traced{false};
// Worker thread id.
pid_t worker_tid{-1};
}
其中比较重要的是要执行的函数任务以及参数 void* (*fn)(void*){NULL};
,void* arg{NULL};
以及栈 ContextualStack* stack{NULL};
用于存储上下文
bthread栈结构如下:
- 其中context由bthread_make_fcontext函数创建,使用汇编代码实现
- 当bthread恢复执行时,会从ContextualStack中恢复寄存器及函数栈状态
struct StackStorage {
int stacksize;
int guardsize;
void* bottom; // 栈底部指针,使用mmap分配内存
unsigned valgrind_stack_id;
};
struct ContextualStack {
bthread_fcontext_t context;
StackType stacktype;
StackStorage storage;
};
bthread属性bthread_attr_t
,都为int类型4B,可以最多32个标记
- 其中stack_type分为:
- 等于原生pthread
- 小:默认32768B,32KB
- 标准:默认1048576B,1MB,epoll bthread使用了此大小
- 大:8MB
- flag使用了位比较,可以同时设置多个flag
- BTHREAD_LOG_START_AND_FINISH:在bthread启动和结束时打印日志
- BTHREAD_LOG_CONTEXT_SWITCH:bthread切换时打印日志
- BTHREAD_NOSIGNAL:用于把相同的批处理任务放到同一组,bthread_flush唤醒这组的暂停任务
- BTHREAD_NEVER_QUIT:标记不会退出的bthread,例如epoll bthread
- BTHREAD_INHERIT_SPAN:暂不清楚作用
typedef struct bthread_attr_t {
bthread_stacktype_t stack_type;
bthread_attrflags_t flags;
bthread_keytable_pool_t* keytable_pool;
// 用于bthread分组
bthread_tag_t tag;
} bthread_attr_t;
bthread api
- brpc中bthread基本1:1实现了linux pthread的api,包括对应用户态锁和条件变量
brpc bthread api | linux pthread api |
---|---|
bthread_start_background | pthread_create |
bthread_join | pthread_join |
bthread_usleep | usleep |
bthread_yield | sched_yield |
bthread_exit | pthread_exit |
bthread_mutex_init | pthread_mutex_init |
bthread_mutex_lock | pthread_mutex_lock |
bthread_cond_init | pthread_cond_init |
TaskControl和TaskGroup
TaskControl是bthread的调度器,用于管理bthread和bthread任务组TaskGroup
- brpc会使用线程池执行bthread任务,线程池数量可以通过参数指定,默认是9个线程,操作系统线程的入口函数是
void* TaskControl::worker_thread(void* arg)
// 全局变量
TaskControl* g_task_control = NULL;
// 创建TaskControl 加互斥锁保证唯一性
inline TaskControl* get_or_new_task_control()
其中g_task_control实际是一个原子变量butil::atomic<TaskControl*>*
,由于全局变量初始化顺序问题,声明为普通指针
- 变量初始化问题顺序可以参考:https://www.cnblogs.com/catch/p/4314256.html
TaskGroup是bthread任务组,拥有两个任务队列,一个本地队列,一个远程队列
- 每个线程都有自己的TaskGroup实例,用于管理bthread任务,可以通过
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
获取
class TaskGroup {
private:
WorkStealingQueue<bthread_t> _rq; // 本地队列,本worker线程启动的bthread任务会直接放到这里
RemoteTaskQueue _remote_rq; // 远程队列,用于存放非worker线程分配过来的bthread任务
};
bthread调度流程
- bthread_start_background时,如果存在本线程的task_group且没有指定属性tag,则直接推送到本地队列_rq,其他情况一律_remote_rq(有锁队列)
- 通过源码可以发现task_runner就是bthread执行函数,bthread在sched_to函数中会切换到需要运行的bthread,在task_runner运行结束后会恢复到主bthread
TaskGroup::sched_to(&dummy, tid);
这一行继续执行,所以if语句中的代码永远不会执行
void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
LOG(INFO) << "TaskGroup::run_main_task:" << this->_tid;
TaskGroup* dummy = this;
bthread_t tid;
// 从任务队列中取出bthread
while (wait_task(&tid)) {
LOG(INFO) << "cur_id:" << _cur_meta->tid << " switch to bthread:" << tid << " main_id:" << _main_tid;
TaskGroup::sched_to(&dummy, tid);
LOG(INFO) << "sched_to end";
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
// 这里不会走到
LOG(INFO) << "task_runner start";
TaskGroup::task_runner(1/*skip remained*/);
LOG(INFO) << "task_runner end";
}
LOG(INFO) << "cur_id:" << _cur_meta->tid << " targetid:" << tid << " main_id:" << _main_tid;
}
// Don't forget to add elapse of last wait_task.
current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
}
void TaskGroup::task_runner(intptr_t skip_remained) {
TaskGroup* g = tls_task_group;
if (!skip_remained) {
// 切换bthread之后,把之前未执行完的bthead放入执行队列
while (g->_last_context_remained) {
LOG(INFO) << "taskgroup task_runner run _last_context_remained";
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
}
do {
TaskMeta* const m = g->_cur_meta;
void* thread_return;
try {
thread_return = m->fn(m->arg);
} catch (ExitException& e) {
thread_return = e.value();
}
g->_control->_nbthreads << -1;
g->_control->tag_nbthreads(g->tag()) << -1;
g->set_remained(TaskGroup::_release_last_context, m);
ending_sched(&g); // 这里会切换回主bthread即main_tid
} while (g->_cur_meta->tid != g->_main_tid);
}
参考资料
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 Moyin!