StarRocks源码阅读系列(2) pipeline的profile指标是怎样统计的
前言
本文是基于StarRocks 2.3.3版本源码阅读总结,不同版本源码可能有较大变化,仅供参考。 在StarRocks数据库执行SQL时,be会详细记录每一环节的资源用量和资源耗时,这对于慢查询优化以及集群的性能调优很有帮助,可以通过执行set is_report_success = true 设置session级别的profile上报,这样就可以在fe的web ui里看到profile文件了。 但是打开Profile文件后,一条普通的SQL就可以输出几千上万行的日志,刚开始接触StarRocks的同学们可能很难找到重点。 本文将从源码的角度找出profile输出的指标是怎样计算的,了解这些信息后,就可能更容易的分析出自己的SQL的瓶颈出在哪里了。
pipeline_driver
_total_timer = ADD_TIMER(_runtime_profile, "DriverTotalTime");
_active_timer = ADD_TIMER(_runtime_profile, "ActiveTime");
_overhead_timer = ADD_TIMER(_runtime_profile, "OverheadTime");
_schedule_timer = ADD_TIMER(_runtime_profile, "ScheduleTime");
_schedule_counter = ADD_COUNTER(_runtime_profile, "ScheduleCount", TUnit::UNIT);
_yield_by_time_limit_counter = ADD_COUNTER(_runtime_profile, "YieldByTimeLimit", TUnit::UNIT);
_block_by_precondition_counter = ADD_COUNTER(_runtime_profile, "BlockByPrecondition", TUnit::UNIT);
_block_by_output_full_counter = ADD_COUNTER(_runtime_profile, "BlockByOutputFull", TUnit::UNIT);
_block_by_input_empty_counter = ADD_COUNTER(_runtime_profile, "BlockByInputEmpty", TUnit::UNIT);
_pending_timer = ADD_TIMER(_runtime_profile, "PendingTime");
_precondition_block_timer = ADD_CHILD_TIMER(_runtime_profile, "PreconditionBlockTime", "PendingTime");
_input_empty_timer = ADD_CHILD_TIMER(_runtime_profile, "InputEmptyTime", "PendingTime");
_first_input_empty_timer = ADD_CHILD_TIMER(_runtime_profile, "FirstInputEmptyTime", "InputEmptyTime");
_followup_input_empty_timer = ADD_CHILD_TIMER(_runtime_profile, "FollowupInputEmptyTime", "InputEmptyTime");
_output_full_timer = ADD_CHILD_TIMER(_runtime_profile, "OutputFullTime", "PendingTime");
_pending_finish_timer = ADD_CHILD_TIMER(_runtime_profile, "PendingFinishTime", "PendingTime");
首先分析Pipeline引擎的Driver中的profile指标
DriverTotalTime
Driver的总用时
COUNTER_UPDATE(_total_timer, _total_timer_sw->elapsed_time()); 只有一个地方进行了统计,就是finalize函数,这个函数是driver运行完成的函数,将_total_timer_sw计时器的运行时间放入了_total_timer中。 接下来看一下_total_timer_sw的开始时间
_total_timer_sw = runtime_state->obj_pool()->add(new MonotonicStopWatch());
_pending_timer_sw = runtime_state->obj_pool()->add(new MonotonicStopWatch());
_precondition_block_timer_sw = runtime_state->obj_pool()->add(new MonotonicStopWatch());
_input_empty_timer_sw = runtime_state->obj_pool()->add(new MonotonicStopWatch());
_output_full_timer_sw = runtime_state->obj_pool()->add(new MonotonicStopWatch());
_pending_finish_timer_sw = runtime_state->obj_pool()->add(new MonotonicStopWatch());
_total_timer_sw->start();
_pending_timer_sw->start();
_precondition_block_timer_sw->start();
_input_empty_timer_sw->start();
_output_full_timer_sw->start();
_pending_finish_timer_sw->start();
在prepare方法的末尾,共打开了6个计时器,其中包含了_total_timer_sw。 至此已经了解了DriverTotalTime统计的时间范围了 就是prepare -> finalize中的所有时间,可以认为包含了pipeline的整个生命周期,就是整个pipeline的运行时间。
ActiveTime
这个是真正任务处于活跃(计算)状态的时间
StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int worker_id) {
COUNTER_UPDATE(_schedule_counter, 1);
SCOPED_TIMER(_active_timer);
set_driver_state(DriverState::RUNNING);
size_t total_chunks_moved = 0;
size_t total_rows_moved = 0;
在process方法里,通过SCOPED_TIMER函数进行统计,因此ActiveTIme的值就是计算任务处于process函数中的时间
OverheadTime
void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state) {
VLOG_ROW << "[Driver] finalize, driver=" << this;
DCHECK(state == DriverState::FINISH || state == DriverState::CANCELED || state == DriverState::INTERNAL_ERROR);
_close_operators(runtime_state);
set_driver_state(state);
COUNTER_UPDATE(_total_timer, _total_timer_sw->elapsed_time());
COUNTER_UPDATE(_schedule_timer, _total_timer->value() - _active_timer->value() - _pending_timer->value());
_update_overhead_timer();
首先,在finalize函数里面调用了_update_overhead_timer()函数
void PipelineDriver::_update_overhead_timer() {
int64_t overhead_time = _active_timer->value();
RuntimeProfile* profile = _runtime_profile.get();
std::vector<RuntimeProfile*> children;
profile->get_children(&children);
for (auto* child_profile : children) {
auto* total_timer = child_profile->get_counter("OperatorTotalTime");
if (total_timer != nullptr) {
overhead_time -= total_timer->value();
}
}
COUNTER_UPDATE(_overhead_timer, overhead_time);
}
可以看出,overhead_timer就是active_timer 减去所有的OperatorTotalTime 就是在实际的活跃时间中,减去所有操作算子消耗的时间,代表着中间环节流逝的时间
YieldByTimeLimit
pipeline中断次数
if (time_spent >= YIELD_MAX_TIME_SPENT) {
should_yield = true;
COUNTER_UPDATE(_yield_by_time_limit_counter, time_spent >= YIELD_MAX_TIME_SPENT);
break;
}
if (_workgroup != nullptr && time_spent >= YIELD_PREEMPT_MAX_TIME_SPENT &&
_workgroup->driver_sched_entity()->in_queue()->should_yield(this, time_spent)) {
should_yield = true;
COUNTER_UPDATE(_yield_by_time_limit_counter, time_spent >= YIELD_MAX_TIME_SPENT);
break;
}
time_spent是process算子所用的时间 YIELD_MAX_TIME_SPENT 为100ms YIELD_PREEMPT_MAX_TIME_SPENT 为 5ms pipeline算子链的每一个operator执行完成之后都会进行判断 time_spent >= YIELD_MAX_TIME_SPENT 累积用时是否达到100ms,如果达到了,则yield_by_time_limit_counter计数器+1并退出循环 如果不满足100ms的话进行第二层判断 _workgroup != nullptr 判断是否指定了资源组 time_spent >= YIELD_PREEMPT_MAX_TIME_SPENT 判断当前pipeline累积执行时间是否达到了5ms should_yield(this, time_spent) 这个函数的源码我也放在了下面,简单理解就是是否达到了限流标准关于限流判断的源码解读,我在上一篇文章中有提到。如果没有达到限流标准,则再次判断当前的workgroup是否是运行时间最短的work group,如果不是,则退出让出资源。 这里有个bug,计数器的累加条件是time_spent >= YIELD_MAX_TIME_SPENT,可以看到第一个if的判断语句就是这个,因此如果这个判断为true的话,在第一个if 语句就满足条件并退出了,代码执行不到这里,因此这里的判断恒等于false,我已经给官方人员提交了pr,打算将这里的判断改为time_spent >= YIELD_PREEMPT_MAX_TIME_SPENT。或者应该更简单一点,既然已经决定退出了,直接写死1就行。(我已经给官方修复了该BUG,目前已经被合并到主分支了)
bool WorkGroupDriverQueue::should_yield(const DriverRawPtr driver, int64_t unaccounted_runtime_ns) const {
if (_throttled(driver->workgroup()->driver_sched_entity(), unaccounted_runtime_ns)) {
return true;
}
auto* wg_entity = driver->workgroup()->driver_sched_entity();
return _min_wg_entity.load() != wg_entity &&
_min_vruntime_ns.load() < wg_entity->vruntime_ns() + unaccounted_runtime_ns / wg_entity->cpu_limit();
}
BlockByPrecondition
因为前置条件没准备好导致的pipeline中断运行次数
if (num_chunks_moved == 0 || should_yield) {
if (is_precondition_block()) {
set_driver_state(DriverState::PRECONDITION_BLOCK);
COUNTER_UPDATE(_block_by_precondition_counter, 1);
} else if (!sink_operator()->is_finished() && !sink_operator()->need_input()) {
set_driver_state(DriverState::OUTPUT_FULL);
COUNTER_UPDATE(_block_by_output_full_counter, 1);
} else if (!source_operator()->is_finished() && !source_operator()->has_output()) {
set_driver_state(DriverState::INPUT_EMPTY);
COUNTER_UPDATE(_block_by_input_empty_counter, 1);
} else {
set_driver_state(DriverState::READY);
}
return _state;
}
最外层的判断是num_chunks_moved == 0 || should_yield 分为两种情况,num_chunks_moved == 0 代表着没有任何需要计算的数据了 showld_yield为可中断的,可能是pipeline 已经运行了100ms,或者是当前work group已经运行了5ms并且有优先级更高的work group想要运行。
内层判断为is_precondition_block()
bool is_precondition_block() {
if (!_wait_global_rf_ready) {
if (dependencies_block() || local_rf_block()) {
return true;
}
_wait_global_rf_ready = true;
if (_global_rf_descriptors.empty()) {
return false;
}
_global_rf_wait_timeout_ns += _precondition_block_timer_sw->elapsed_time();
return global_rf_block();
} else {
return global_rf_block();
}
}
根据函数的注释可以了解到,就是判断一下hash table和local runtime filter是否已经加载好了
BlockByOutputFull
因为数据全部输出完毕导致的pipeline中断运行次数 判断逻辑为 !sink_operator()->is_finished() && !sink_operator()->need_input() 简单来说就是判断sink算子是不是不需要任何输入数据,但是还没有完成变成finish状态,如果是的话,就暂时中断
BlockByInputEmpty
因为没有任何输入数据导致的pipeline中断运行次数 判断逻辑为!source_operator()->is_finished() && !source_operator()->has_output() 简单来说就是source算子没有变成finish状态但是没有任何输出,可能是因为前置的pipeline还没有计算完成。
PendingTime
首先在prepare函数里发起了 _pending_timer_sw->start() ; 代表着开始计时
void PipelineDriverPoller::add_blocked_driver(const DriverRawPtr driver) {
std::unique_lock<std::mutex> lock(_mutex);
_blocked_drivers.push_back(driver);
driver->_pending_timer_sw->reset();
_cond.notify_one();
}
void PipelineDriverPoller::remove_blocked_driver(DriverList& local_blocked_drivers, DriverList::iterator& driver_it) {
auto& driver = *driver_it;
driver->_pending_timer->update(driver->_pending_timer_sw->elapsed_time());
local_blocked_drivers.erase(driver_it++);
}
然后在add_blocked_driver函数里重置了计时器 在remove_blocked_driver将计时结果更新到了pendtine_time。 然后看一下这两个函数在哪里调用。 add_blocked_driver 首先看一下计时器在什么情况下重置,共有5个地方
if (fragment_ctx->is_canceled()) {
driver->cancel_operators(runtime_state);
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
_blocked_driver_poller->add_blocked_driver(driver);
} else {
_finalize_driver(driver, runtime_state, DriverState::CANCELED);
}
continue;
}
第一个地方是在查询段被取消时,这种情况一般发生在查询超时。
if (!status.ok()) {
LOG(WARNING) << "[Driver] Process error, query_id=" << print_id(driver->query_ctx()->query_id())
<< ", instance_id=" << print_id(driver->fragment_ctx()->fragment_instance_id())
<< ", status=" << status;
query_ctx->cancel(status);
driver->cancel_operators(runtime_state);
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
_blocked_driver_poller->add_blocked_driver(driver);
} else {
_finalize_driver(driver, runtime_state, DriverState::INTERNAL_ERROR);
}
continue;
}
第二个地方就是当查询出错时,原因有很多,可能是be自身问题,或者宿主机问题。
case PRECONDITION_BLOCK: {
_blocked_driver_poller->add_blocked_driver(driver);
break;
}
第三个可能是当pipeline引擎的前置条件没有准备好时。
void GlobalDriverExecutor::submit(DriverRawPtr driver) {
if (driver->is_precondition_block()) {
driver->set_driver_state(DriverState::PRECONDITION_BLOCK);
driver->mark_precondition_not_ready();
this->_blocked_driver_poller->add_blocked_driver(driver);
} else {
driver->submit_operators();
if (!driver->source_operator()->is_finished() && !driver->source_operator()->has_output()) {
driver->set_driver_state(DriverState::INPUT_EMPTY);
this->_blocked_driver_poller->add_blocked_driver(driver);
} else {
this->_driver_queue->put_back(driver);
}
}
}
第四第五再一起,在运行完成上报状态的函数中,判断前置条件没有准备好,或者当source算子没有完成并且没有任何数据输出时。 总结5种可能,前两种是查询故障了,此时pendingtime的统计没有意义了,第三种是正常情况,driver 在构建基础hash 时重置计时器,用于让_pending_timer只涵盖调用add_blocked_driver和remove_blocked_driver之间的时间区间,后两种也不用关心,在submit函数上报状态时算子却不是finish状态,可能本身就没有扫描到任何数据,无需处理。 remove_blocked_driver
while (driver_it != local_blocked_drivers.end()) {
auto* driver = *driver_it;
if (driver->query_ctx()->is_query_expired()) {
LOG(WARNING) << "[Driver] Timeout, query_id=" << print_id(driver->query_ctx()->query_id())
<< ", instance_id=" << print_id(driver->fragment_ctx()->fragment_instance_id());
driver->fragment_ctx()->cancel(Status::TimedOut(fmt::format(
"Query exceeded time limit of {} seconds", driver->query_ctx()->get_query_expire_seconds())));
driver->cancel_operators(driver->fragment_ctx()->runtime_state());
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
++driver_it;
} else {
driver->set_driver_state(DriverState::FINISH);
remove_blocked_driver(local_blocked_drivers, driver_it);
ready_drivers.emplace_back(driver);
}
} else if (driver->fragment_ctx()->is_canceled()) {
driver->cancel_operators(driver->fragment_ctx()->runtime_state());
if (driver->is_still_pending_finish()) {
driver->set_driver_state(DriverState::PENDING_FINISH);
++driver_it;
} else {
driver->set_driver_state(DriverState::CANCELED);
remove_blocked_driver(local_blocked_drivers, driver_it);
ready_drivers.emplace_back(driver);
}
} else if (driver->pending_finish()) {
if (driver->is_still_pending_finish()) {
++driver_it;
} else {
driver->set_driver_state(driver->fragment_ctx()->is_canceled() ? DriverState::CANCELED
: DriverState::FINISH);
remove_blocked_driver(local_blocked_drivers, driver_it);
ready_drivers.emplace_back(driver);
}
} else if (driver->is_finished()) {
remove_blocked_driver(local_blocked_drivers, driver_it);
ready_drivers.emplace_back(driver);
} else if (driver->is_not_blocked()) {
driver->set_driver_state(DriverState::READY);
remove_blocked_driver(local_blocked_drivers, driver_it);
ready_drivers.emplace_back(driver);
} else {
++driver_it;
}
}
共有5个地方调用remove_block_driver函数 查询超时,并且满足is_still_pending_finish,满足is_still_pending_finish需要source或者sink算子达到pending_finish状态 franment执行被取消,并且source状态处于PENDING_FINISH状态 driver处于PENDING_FINISH状态,但是source和sink都不是PENDING_FINISH状态 driver到达finish状态 druver的is_not_blocked返回true,可以理解为没有任何数据输出。
driver会在3种情况下变为PENDING_FINISH状态
- process函数中,sink算子已经到达完成状态,但是source算子处于PENDING_FINISH状态
if (sink_operator()->is_finished()) {
finish_operators(runtime_state);
set_driver_state(is_still_pending_finish() ? DriverState::PENDING_FINISH : DriverState::FINISH);
return _state;
}
check_short_circuit函数,sink算子已经完成,但是source算子处于PENDING_FINISH状态
void PipelineDriver::check_short_circuit() {
int last_finished = -1;
for (int i = _first_unfinished; i < _operators.size() - 1; i++) {
if (_operators[i]->is_finished()) {
last_finished = i;
}
}
if (last_finished == -1) {
return;
}
_mark_operator_finishing(_operators[last_finished + 1], _runtime_state);
for (auto i = _first_unfinished; i <= last_finished; ++i) {
_mark_operator_finished(_operators[i], _runtime_state);
}
_first_unfinished = last_finished + 1;
if (sink_operator()->is_finished()) {
finish_operators(_runtime_state);
set_driver_state(is_still_pending_finish() ? DriverState::PENDING_FINISH : DriverState::FINISH);
}
}
在fragment被取消的时候
bool PipelineDriver::_check_fragment_is_canceled(RuntimeState* runtime_state) {
if (_fragment_ctx->is_canceled()) {
cancel_operators(runtime_state);
if (is_still_pending_finish()) {
set_driver_state(DriverState::PENDING_FINISH);
} else {
set_driver_state(_fragment_ctx->final_status().ok() ? DriverState::FINISH : DriverState::CANCELED);
}
return true;
}
return false;
}
排除所有异常情况,可以把PENDING_TIME理解为从开始从开始准备接受数据,到Source算子运行完成的时间。
InputEmptyTime
PendingTime 的一个子参数,含义是没有任何数据输入的时间 如果这个值显示时间很多,可能是由于上游Pipeline没有及时的把数据发送到这个Pipeline
FollowupInputEmptyTime
这个值是接受到了第一条数据,但是Drive仍处于INPUT_EMPTY状态的时间 一般很小
switch (_state) {
case DriverState::INPUT_EMPTY: {
auto elapsed_time = _input_empty_timer_sw->elapsed_time();
if (_first_input_empty_timer->value() == 0) {
_first_input_empty_timer->update(elapsed_time);
} else {
_followup_input_empty_timer->update(elapsed_time);
}
_input_empty_timer->update(elapsed_time);
break;
}
OutputFullTime
Driver状态处于OUTPUT_FULL的时间
void set_driver_state(DriverState state) {
if (state == _state) {
return;
}
switch (_state) {
case DriverState::INPUT_EMPTY: {
auto elapsed_time = _input_empty_timer_sw->elapsed_time();
if (_first_input_empty_timer->value() == 0) {
_first_input_empty_timer->update(elapsed_time);
} else {
_followup_input_empty_timer->update(elapsed_time);
}
_input_empty_timer->update(elapsed_time);
break;
}
case DriverState::OUTPUT_FULL:
_output_full_timer->update(_output_full_timer_sw->elapsed_time());
break;
case DriverState::PRECONDITION_BLOCK:
_precondition_block_timer->update(_precondition_block_timer_sw->elapsed_time());
break;
case DriverState::PENDING_FINISH:
_pending_finish_timer->update(_pending_finish_timer_sw->elapsed_time());
break;
default:
break;
}
switch (state) {
case DriverState::INPUT_EMPTY:
_input_empty_timer_sw->reset();
break;
case DriverState::OUTPUT_FULL:
_output_full_timer_sw->reset();
break;
case DriverState::PRECONDITION_BLOCK:
_precondition_block_timer_sw->reset();
break;
case DriverState::PENDING_FINISH:
_pending_finish_timer_sw->reset();
break;
default:
break;
}
_state = state;
}
从状态转换函数可知,每次状态由其他状态转换为OUTPUT_FULL时计时器重置开始计时 由OUTPUT_FULL转换为其他状态时,将计时器时间记录,因此该时间为Driver在OUTPUT_FULL状态的停留时间
这个状态的转入只有一种可能,就是sink算子没有运行完成,但是不需要任何input输入(need_input()为 false)。 这个值如果较高可以关注一下原因,可能是buffer为空。
PreconditionBlockTime
pipeline引擎等待hash表和本地运行过滤器准备阶段的时间,源码后续补充
FirstInputEmptyTime
pipeline引擎从加载完成到第一条数据接入之间的时间,源码后续补充
PendingFinishTime
pipeline引擎卡在PendingFinish阶段的时间,这个阶段就是已经sink算子数据输出完毕了,但是source算子还并没有关闭的时间,一般来说是IO线程太忙了导致的。源码后续补充
ScheduleTime
COUNTER_UPDATE(_schedule_timer, _total_timer->value() - _active_timer->value() - _pending_timer->value());
这计算逻辑很简单,TotalTime - ActiveTime - PendingTime 这3个时间在前面都有介绍
|