imay opened a new issue #1827: DataStreamSender close channel serially
URL: https://github.com/apache/incubator-doris/issues/1827
 
 
   Currently, in Doris code, DataStreamSender close all channels serially. It 
will make receivers wait for the results. It will cause that concurrent queries 
can not use all CPU resource.
   The stack looks like following 
   sender:
   ```
   #0  0x00007fef649f7469 in syscall () from /lib64/libc.so.6
   #1  0x00000000025523e6 in futex_wait_private (timeout=0x0, expected=0, 
addr1=0x7fe7c3a73b60) at 
/var/local/incubator-doris/thirdparty/src/incubator-brpc-0.9.5/src/bthread/sys_futex.h:37
   #2  bthread::wait_pthread (pw=..., ptimeout=ptimeout@entry=0x0) at 
/var/local/incubator-doris/thirdparty/src/incubator-brpc-0.9.5/src/bthread/butex.cpp:138
   #3  0x0000000002552ec0 in butex_wait_from_pthread (abstime=0x0, 
expected_value=633, b=0x97036340, g=<optimized out>) at 
/var/local/incubator-doris/thirdparty/src/incubator-brpc-0.9.5/src/bthread/butex.cpp:585
   #4  bthread::butex_wait (arg=arg@entry=0x97036340, 
expected_value=expected_value@entry=633, abstime=abstime@entry=0x0) at 
/var/local/incubator-doris/thirdparty/src/incubator-brpc-0.9.5/src/bthread/butex.cpp:618
   #5  0x000000000255c5b8 in bthread_id_join (id=...) at 
/var/local/incubator-doris/thirdparty/src/incubator-brpc-0.9.5/src/bthread/id.cpp:526
   #6  0x00000000023e37ba in brpc::Join (id=...) at 
/var/local/incubator-doris/thirdparty/src/incubator-brpc-0.9.5/src/brpc/controller.cpp:527
   #7  0x000000000159692f in doris::DataStreamSender::Channel::_wait_last_brpc 
(this=this@entry=0x6d5df760) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/data_stream_sender.cpp:129
   #8  0x0000000001595843 in doris::DataStreamSender::Channel::close_internal 
(this=this@entry=0x6d5df760) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/data_stream_sender.cpp:286
   #9  0x0000000001595a80 in close (state=0x1e92f800, this=0x6d5df760) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/data_stream_sender.cpp:292
   #10 doris::DataStreamSender::close (this=0x56c42680, state=0x1e92f800, 
exec_status=...) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/data_stream_sender.cpp:595
   #11 0x0000000001033303 in doris::PlanFragmentExecutor::open_internal 
(this=this@entry=0x3e4848b0) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/plan_fragment_executor.cpp:349
   #12 0x0000000001033504 in doris::PlanFragmentExecutor::open 
(this=this@entry=0x3e4848b0) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/plan_fragment_executor.cpp:282
   #13 0x0000000000fc3ae7 in doris::FragmentExecState::execute 
(this=0x3e484840) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/fragment_mgr.cpp:208
   #14 0x0000000000fc5736 in 
doris::FragmentMgr::exec_actual(std::shared_ptr<doris::FragmentExecState>, 
std::function<void (doris::PlanFragmentExecutor*)>) (this=0x8243880, 
exec_state=..., cb=...) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/fragment_mgr.cpp:387
   #15 0x0000000000fcba88 in __invoke_impl<void, void 
(doris::FragmentMgr::*&)(std::shared_ptr<doris::FragmentExecState>, 
std::function<void(doris::PlanFragmentExecutor*)>), doris::FragmentMgr*&, 
std::shared_ptr<doris::FragmentExecState>&, 
std::function<void(doris::PlanFragmentExecutor*)>&> (__t=@0x19ab29f0: 
0x8243880, __f=@0x19ab29b0: (void (doris::FragmentMgr::*)(doris::FragmentMgr * 
const, std::shared_ptr<doris::FragmentExecState>, 
std::function<void(doris::PlanFragmentExecutor*)>)) 0xfc5710 
<doris::FragmentMgr::exec_actual(std::shared_ptr<doris::FragmentExecState>, 
std::function<void (doris::PlanFragmentExecutor*)>)>) at 
/usr/include/c++/7.3.0/bits/invoke.h:73
   #16 __invoke<void 
(doris::FragmentMgr::*&)(std::shared_ptr<doris::FragmentExecState>, 
std::function<void(doris::PlanFragmentExecutor*)>), doris::FragmentMgr*&, 
std::shared_ptr<doris::FragmentExecState>&, 
std::function<void(doris::PlanFragmentExecutor*)>&> (__fn=@0x19ab29b0: (void 
(doris::FragmentMgr::*)(doris::FragmentMgr * const, 
std::shared_ptr<doris::FragmentExecState>, 
std::function<void(doris::PlanFragmentExecutor*)>)) 0xfc5710 
<doris::FragmentMgr::exec_actual(std::shared_ptr<doris::FragmentExecState>, 
std::function<void (doris::PlanFragmentExecutor*)>)>) at 
/usr/include/c++/7.3.0/bits/invoke.h:95
   #17 __call<void, 0, 1, 2> (__args=..., this=0x19ab29b0) at 
/usr/include/c++/7.3.0/functional:632
   #18 operator()<> (this=0x19ab29b0) at /usr/include/c++/7.3.0/functional:718
   #19 
boost::detail::function::void_function_obj_invoker0<std::_Bind_result<void, 
void (doris::FragmentMgr::*(doris::FragmentMgr*, 
std::shared_ptr<doris::FragmentExecState>, std::function<void 
(doris::PlanFragmentExecutor*)>))(std::shared_ptr<doris::FragmentExecState>, 
std::function<void (doris::PlanFragmentExecutor*)>)>, 
void>::invoke(boost::detail::function::function_buffer&) (function_obj_ptr=...) 
at 
/var/local/thirdparty/installed/include/boost/function/function_template.hpp:159
   #20 0x0000000000fc4ed4 in operator() (this=0x32ad4320) at 
/var/local/thirdparty/installed/include/boost/function/function_template.hpp:759
   #21 doris::fragment_executor (param=0x32ad4320) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/fragment_mgr.cpp:412
   #22 0x00007fef646f1dc5 in start_thread () from /lib64/libpthread.so.0
   #23 0x00007fef649fcced in clone () from /lib64/libc.so.6
   ```
   receiver:
   ```
   #0  0x00007fef646f56d5 in pthread_cond_wait@@GLIBC_2.3.2 () from 
/lib64/libpthread.so.0
   #1  0x000000000105c05f in wait (m=..., this=0x3f6fc3f8) at 
/var/local/thirdparty/installed/include/boost/thread/pthread/condition_variable.hpp:76
   #2  doris::DataStreamRecvr::SenderQueue::get_batch (this=0x3f6fc3c0, 
next_batch=next_batch@entry=0x7ddf7e30) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/data_stream_recvr.cc:153
   #3  0x000000000105c80b in doris::DataStreamRecvr::get_batch (this=<optimized 
out>, next_batch=next_batch@entry=0x7ddf7e30) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/data_stream_recvr.cc:436
   #4  0x00000000013bd8f6 in doris::ExchangeNode::fill_input_row_batch 
(this=this@entry=0x7ddf7c00, state=state@entry=0x11b14300) at 
/root/incubator-doris-apache/incubator-doris/be/src/exec/exchange_node.cpp:122
   #5  0x00000000013bf3e6 in doris::ExchangeNode::open (this=0x7ddf7c00, 
state=0x11b14300) at 
/root/incubator-doris-apache/incubator-doris/be/src/exec/exchange_node.cpp:92
   #6  0x0000000001486c97 in doris::NewPartitionedAggregationNode::open 
(this=0x42691b80, state=0x11b14300) at 
/root/incubator-doris-apache/incubator-doris/be/src/exec/new_partitioned_aggregation_node.cc:257
   #7  0x0000000001032ddb in doris::PlanFragmentExecutor::open_internal 
(this=this@entry=0x4909ee30) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/plan_fragment_executor.cpp:298
   #8  0x0000000001033504 in doris::PlanFragmentExecutor::open 
(this=this@entry=0x4909ee30) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/plan_fragment_executor.cpp:282
   #9  0x0000000000fc3ae7 in doris::FragmentExecState::execute 
(this=0x4909edc0) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/fragment_mgr.cpp:208
   #10 0x0000000000fc5736 in 
doris::FragmentMgr::exec_actual(std::shared_ptr<doris::FragmentExecState>, 
std::function<void (doris::PlanFragmentExecutor*)>) (this=0x8243880, 
exec_state=..., cb=...) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/fragment_mgr.cpp:387
   #11 0x0000000000fcba88 in __invoke_impl<void, void 
(doris::FragmentMgr::*&)(std::shared_ptr<doris::FragmentExecState>, 
std::function<void(doris::PlanFragmentExecutor*)>), doris::FragmentMgr*&, 
std::shared_ptr<doris::FragmentExecState>&, 
std::function<void(doris::PlanFragmentExecutor*)>&> (__t=@0x7a67b670: 
0x8243880, __f=@0x7a67b630: (void (doris::FragmentMgr::*)(doris::FragmentMgr * 
const, std::shared_ptr<doris::FragmentExecState>, 
std::function<void(doris::PlanFragmentExecutor*)>)) 0xfc5710 
<doris::FragmentMgr::exec_actual(std::shared_ptr<doris::FragmentExecState>, 
std::function<void (doris::PlanFragmentExecutor*)>)>) at 
/usr/include/c++/7.3.0/bits/invoke.h:73
   #12 __invoke<void 
(doris::FragmentMgr::*&)(std::shared_ptr<doris::FragmentExecState>, 
std::function<void(doris::PlanFragmentExecutor*)>), doris::FragmentMgr*&, 
std::shared_ptr<doris::FragmentExecState>&, 
std::function<void(doris::PlanFragmentExecutor*)>&> (__fn=@0x7a67b630: (void 
(doris::FragmentMgr::*)(doris::FragmentMgr * const, 
std::shared_ptr<doris::FragmentExecState>, 
std::function<void(doris::PlanFragmentExecutor*)>)) 0xfc5710 
<doris::FragmentMgr::exec_actual(std::shared_ptr<doris::FragmentExecState>, 
std::function<void (doris::PlanFragmentExecutor*)>)>) at 
/usr/include/c++/7.3.0/bits/invoke.h:95
   #13 __call<void, 0, 1, 2> (__args=..., this=0x7a67b630) at 
/usr/include/c++/7.3.0/functional:632
   #14 operator()<> (this=0x7a67b630) at /usr/include/c++/7.3.0/functional:718
   #15 
boost::detail::function::void_function_obj_invoker0<std::_Bind_result<void, 
void (doris::FragmentMgr::*(doris::FragmentMgr*, 
std::shared_ptr<doris::FragmentExecState>, std::function<void 
(doris::PlanFragmentExecutor*)>))(std::shared_ptr<doris::FragmentExecState>, 
std::function<void (doris::PlanFragmentExecutor*)>)>, 
void>::invoke(boost::detail::function::function_buffer&) (function_obj_ptr=...) 
at 
/var/local/thirdparty/installed/include/boost/function/function_template.hpp:159
   #16 0x0000000000fc4ed4 in operator() (this=0x2598ad20) at 
/var/local/thirdparty/installed/include/boost/function/function_template.hpp:759
   #17 doris::fragment_executor (param=0x2598ad20) at 
/root/incubator-doris-apache/incubator-doris/be/src/runtime/fragment_mgr.cpp:412
   #18 0x00007fef646f1dc5 in start_thread () from /lib64/libpthread.so.0
   #19 0x00007fef649fcced in clone () from /lib64/libc.so.6
   ```
   
   So, I will make close in two phases. 
   
   1. close all channels without wait
   2. wait channels one by one
   
   This is related with #1726 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to