Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1864160788 a better method see #2358 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn closed pull request #2215: add usercode thread pool for method URL: https://github.com/apache/brpc/pull/2215 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1683309772 #2358 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1605894348 和主干冲突了 @yanglimingcn -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1573313057 这块后边最好实现一个无锁队列,性能应该更好些。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1562762813 嗯,比如现在服务有3个方法,worker线程一共有10个,method1不配置线程池,method2配置2个 method3配置3个,这样能保证method2和method3最多能用5个,如果method1请求太多的话,method2和method3将少于5个。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1562753543 如果是pthread的话可以保证它肯定只执行UserCode的任务,但是如果是ExecutionQueue的话,有可能中间bthread切走了,被其它的bthread任务插入,那是不是还是解决不了隔离的问题? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1562734266 @wwbmmm @serverglen @chenBright 我现在对这个feature又有一点思考,想和你们沟通一下,现在UserCodeThreadWorker是一个pthread线程,和brpc的worker是分离的。我想如果UserCodeThreadWorker是一个ExecutionQueue的话其实是可以共用brpc worker的。然后不同方法配置不同数量的ExecutionQueue,也就实现了不同方法对brpc worker的分配。这样的好处是用户的代码还是bthread上下文。缺点是阻塞的系统调用会阻塞worker,但是本身这就是bthread的性质。想听听你们的意见。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
serverglen commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1204945239 ## example/usercode_thread_echo_c++/server.cpp: ## @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// A server to receive EchoRequest and send back EchoResponse. + +#include +#include +#include +#include "echo.pb.h" + +DEFINE_bool(echo_attachment, true, "Echo attachment as well"); +DEFINE_int32(port, 8000, "TCP Port of this server"); +DEFINE_string(listen_addr, "", + "Server listen address, may be IPV4/IPV6/UDS." + " If this is set, the flag port will be ignored"); +DEFINE_int32(idle_timeout_s, -1, + "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s'"); +DEFINE_int32(logoff_ms, 2000, + "Maximum duration of server's LOGOFF state " + "(waiting for client to close connection before server stops)"); +DEFINE_int32(num_threads1, 2, "thread number for pool1"); +DEFINE_int32(num_threads2, 2, "thread number for pool2"); + +// Your implementation of example::EchoService +// Notice that implementing brpc::Describable grants the ability to put +// additional information in /status. +namespace example { +butil::atomic ntls(0); +struct MyThreadLocalData { +MyThreadLocalData() : y(0) { +ntls.fetch_add(1, butil::memory_order_relaxed); +} +~MyThreadLocalData() { ntls.fetch_sub(1, butil::memory_order_relaxed); } +static void deleter(void* d) { delete static_cast(d); } + +int y; +}; + +class MyThreadLocalDataFactory : public brpc::DataFactory { +public: +void* CreateData() const { return new MyThreadLocalData; } + +void DestroyData(void* d) const { MyThreadLocalData::deleter(d); } +}; + +class EchoServiceImpl : public EchoService { +public: +EchoServiceImpl(){}; +virtual ~EchoServiceImpl(){}; +virtual void Echo(google::protobuf::RpcController* cntl_base, + const EchoRequest* request, EchoResponse* response, + google::protobuf::Closure* done) { +// This object helps you to call done->Run() in RAII style. If you need +// to process the request asynchronously, pass done_guard.release(). +brpc::ClosureGuard done_guard(done); + +brpc::Controller* cntl = static_cast(cntl_base); + +MyThreadLocalData* tls = +static_cast(brpc::thread_local_data()); +if (tls == NULL) { +cntl->SetFailed( +"Require ServerOptions.thread_local_data_factory " +"to be set with a correctly implemented instance"); +LOG(ERROR) << cntl->ErrorText(); +return; +} +// The purpose of following logs is to help you to understand +// how clients interact with servers more intuitively. You should +// remove these logs in performance-sensitive servers. +LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from " + << cntl->remote_side() << " to " << cntl->local_side() << ": " + << request->message() + << " (attached=" << cntl->request_attachment() << ")"; + +// Fill response. +response->set_message(request->message()); + +// You can compress the response by setting Controller, but be aware +// that compression may be costly, evaluate before turning on. +// cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP); + +if (FLAGS_echo_attachment) { +// Set attachment which is wired to network directly instead of +// being serialized into protobuf messages. +cntl->response_attachment().append(cntl->request_attachment()); +} +} +virtual void Echo2(google::protobuf::RpcController* cntl_base, + const EchoRequest* request, EchoResponse* response, + google::protobuf::Closure* done) { +// This object helps you to call done->Run() in RAII style. If you need +// to process the request asynchronously, pass done_guard.release(). +brpc::ClosureGuard done_guard(done); + +
Re: [PR] add usercode thread pool for method (brpc)
serverglen commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1202585354 ## src/brpc/usercode_thread_pool.h: ## @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_USERCODE_THREAD_POOL_H +#define BRPC_USERCODE_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include "butil/atomicops.h" +#include "bvar/bvar.h" +#include "bthread/task_meta.h" +#include "butil/containers/case_ignored_flat_map.h" // [CaseIgnored]FlatMap + +namespace brpc { +// Store pending user code. +struct UserCodeTask { +void (*fn)(void*); +void* arg; +void* assigned_data; +}; + +class UserCodeThreadAssignPolicy { +public: +UserCodeThreadAssignPolicy() {} +virtual ~UserCodeThreadAssignPolicy() {} +virtual size_t Index(void* arg, size_t range) = 0; + +private: +DISALLOW_COPY_AND_ASSIGN(UserCodeThreadAssignPolicy); +}; + +class UserCodeThreadRandomAssignPolicy : public UserCodeThreadAssignPolicy { Review Comment: 是不是加一个 const UserCodeThreadAssignPolicy* DefaultUserCodeThreadAssignPolicy() 函数 稍微好点?类似 DefaultRetryPolicy(); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
serverglen commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1202566928 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" Review Comment: brpc/usercode_thread_pool.h头文件移到最下面吧 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1558722470 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1201470417 ## src/brpc/server.h: ## @@ -379,6 +380,7 @@ class Server { const google::protobuf::MethodDescriptor* method; MethodStatus* status; AdaptiveMaxConcurrency max_concurrency; +UserCodeThreadPool* thread_pool; Review Comment: 这里增加了成员,但没有在MethodProperty的构造函数中对它进行初始化 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1554164103 > 有个BuiltinServiceTest.rpcz的case总是挂,看看是不是和这次修改的代码有关? 我本地复线不了,理论上没有去配置的话,不会走这些代码逻辑的。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1554079245 有个BuiltinServiceTest.rpcz的case总是挂,看看是不是和这次修改的代码有关? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1550911590 > 只支持baidu协议吗?其他协议也应该支持吧? 等代码主体review没啥问题了,我再给其它协议加上。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1550910750 > 另外,能不能加一些单测? done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
chenBright commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1549549620 只支持baidu协议吗?其他协议也应该支持吧? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1549494144 另外,能不能加一些单测? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1195028211 ## src/brpc/usercode_thread_pool.h: ## @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_USERCODE_THREAD_POOL_H +#define BRPC_USERCODE_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include "butil/atomicops.h" +#include "bvar/bvar.h" +#include "bthread/task_meta.h" +#include "butil/containers/case_ignored_flat_map.h" // [CaseIgnored]FlatMap + +namespace brpc { +// Store pending user code. +struct UserCodeTask { +void (*fn)(void*); +void* arg; +void* assigned_data; +}; + +class UserCodeThreadAssignPolicy { +public: +UserCodeThreadAssignPolicy() {} +virtual ~UserCodeThreadAssignPolicy() {} +virtual size_t Index(void* arg, size_t range) = 0; + +private: +DISALLOW_COPY_AND_ASSIGN(UserCodeThreadAssignPolicy); +}; + +class UserCodeThreadRandomAssignPolicy : public UserCodeThreadAssignPolicy { +public: +UserCodeThreadRandomAssignPolicy() {} +virtual ~UserCodeThreadRandomAssignPolicy() {} +size_t Index(void* arg, size_t range) override; + +private: +DISALLOW_COPY_AND_ASSIGN(UserCodeThreadRandomAssignPolicy); +}; + +class UserCodeThreadPool; +class UserCodeThreadWorker { +public: +UserCodeThreadWorker(UserCodeThreadPool* pool); +void UserCodeRun(UserCodeTask&& usercode); +void UserCodeLoop(); +void Start(); +void Stop(); +void Join(); + +private: +UserCodeThreadPool* _pool; +std::deque _queue; +std::mutex _mutex; +std::condition_variable _cond; +std::thread _worker; +std::atomic _running; // running flag +}; +// "user code thread pool" configuration Review Comment: 加个空行 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Bthread local storage +extern __thread bthread::LocalStorage tls_bls; +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread + +namespace brpc { + +DEFINE_int32(usercode_thread_pool_map_nbucket, 64 * 2, + "usercode thread pool map bucket size"); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto worker_id = _pool->NextWorkerId(); +std::string thread_name = +butil::string_printf("usercode_%s:%ld", pool_name.c_str(), worker_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191965402 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std::stri
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191936718 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191908472 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std::stri
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191867372 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191867173 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191866701 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191854729 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std::stri
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191120117 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191120948 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191120948 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191120117 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191071292 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191065633 ## src/brpc/usercode_thread_pool.h: ## @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_USERCODE_THREAD_POOL_H +#define BRPC_USERCODE_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include "butil/atomicops.h" +#include "bvar/bvar.h" +#include "bthread/task_meta.h" +#include "butil/containers/case_ignored_flat_map.h" // [CaseIgnored]FlatMap +namespace bthread { +extern __thread bthread::LocalStorage tls_bls; Review Comment: 这个定义在UserCodeTask结构体里面 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191057326 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; +usercode.fn(usercode.arg); +} +const int64_t end_time = butil::cpuwide_time_us(); +_pool->inpool_count << usercodes.size(); +_pool->inpool_elapse_us << (end_time - begin_time); +last_time = end_time; +} + +VLOG(1) << "exit thread " << thread_name; +} + +void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) { +std::unique_lock lk(_mutex); +_queue.emplace_back(std::move(usercode)); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Start() { +_worker = std::thread(UserCodeRunner, this); +} + +void UserCodeThreadWorker::Stop() { +_running.store(false, std::memory_order_relaxed); +std::unique_lock lk(_mutex); +_cond.notify_one(); +} + +void UserCodeThreadWorker::Join() { +if (_worker.joinable()) { +_worker.join(); +} +} + +double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) { +return static_cast*>(arg)->get_value() / 100.0; +} + +size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) { +auto pool = static_cast(arg); +return pool->_workers.size(); +} + +UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: inpool_per_second("rpc_usercode_thread_pool_second", pool_name, +&inpool_count), + inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us), + pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s, + 1), + thread_count("rpc_usercode_thread_num_threads", pool_name, + GetUserCodeThreadSize, this), + _pool_name(pool_name), + _thread_startfn(startfn), + _assign_policy(policy), + _gflag_num_threads(nullptr) {} + +UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); } + +// TODO(yangliming): use c++17 std::variant +bool UserCodeThreadPool::Init(const std
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1191056242 ## src/brpc/usercode_thread_pool.cpp: ## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/usercode_thread_pool.h" +#include +#include +#include +#include "butil/threading/platform_thread.h" + +namespace bthread { +// Defined in bthread/task_control.cpp +void run_worker_startfn(); +} // namespace bthread +namespace brpc { + +std::atomic UserCodeThreadWorker::_next_worker_id(0); + +static void* UserCodeRunner(void* args) { +static_cast(args)->UserCodeLoop(); +return NULL; +} + +UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool) +: _pool(pool), _running(true) {} + +// Entry of backup thread for running user code. +void UserCodeThreadWorker::UserCodeLoop() { +auto pool_name = _pool->pool_name(); +auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed); +std::string thread_name = +butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id); +butil::PlatformThread::SetName(thread_name.c_str()); +auto startfn = _pool->thread_startfn(); +if (startfn) { +startfn(); +} else { +bthread::run_worker_startfn(); +} + +VLOG(1) << "start thread " << thread_name; + +int64_t last_time = butil::cpuwide_time_us(); +while (true) { +bool blocked = false; +std::deque usercodes; +{ +std::unique_lock lk(_mutex); +_cond.wait(lk, [&]() { +if (!_queue.empty() || +!_running.load(std::memory_order_relaxed)) { +return true; +} else { +blocked = true; +return false; +} +}); +if (!_running.load(std::memory_order_relaxed)) { +break; +} +usercodes = std::move(_queue); +_queue = {}; +} +const int64_t begin_time = +(blocked ? butil::cpuwide_time_us() : last_time); +for (auto& usercode : usercodes) { +bthread::tls_bls = usercode.tls_bls; Review Comment: 这个能解释一下场景吗? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1543612341 @yanglimingcn 能不能添加一个使用文档说明 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on code in PR #2215: URL: https://github.com/apache/brpc/pull/2215#discussion_r1190831296 ## src/brpc/usercode_thread_pool.h: ## @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_USERCODE_THREAD_POOL_H +#define BRPC_USERCODE_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include "butil/atomicops.h" +#include "bvar/bvar.h" +#include "bthread/task_meta.h" +#include "butil/containers/case_ignored_flat_map.h" // [CaseIgnored]FlatMap +namespace bthread { +extern __thread bthread::LocalStorage tls_bls; +} +namespace brpc { +// Store pending user code. +struct UserCode { Review Comment: UserCodeTask ## src/brpc/usercode_thread_pool.h: ## @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_USERCODE_THREAD_POOL_H +#define BRPC_USERCODE_THREAD_POOL_H + +#include +#include +#include +#include +#include +#include +#include "butil/atomicops.h" +#include "bvar/bvar.h" +#include "bthread/task_meta.h" +#include "butil/containers/case_ignored_flat_map.h" // [CaseIgnored]FlatMap +namespace bthread { +extern __thread bthread::LocalStorage tls_bls; +} +namespace brpc { +// Store pending user code. +struct UserCode { +void (*fn)(void*); +void* arg; +bthread::LocalStorage tls_bls; +}; + +class UserCodeThreadAssignPolicy { +public: +UserCodeThreadAssignPolicy() {} +virtual ~UserCodeThreadAssignPolicy() {} +virtual size_t Index(void* arg, size_t range) = 0; + +private: +DISALLOW_COPY_AND_ASSIGN(UserCodeThreadAssignPolicy); +}; + +class UserCodeThreadRandomAssignPolicy : public UserCodeThreadAssignPolicy { +public: +UserCodeThreadRandomAssignPolicy() {} +virtual ~UserCodeThreadRandomAssignPolicy() {} +size_t Index(void* arg, size_t range) override; + +private: +DISALLOW_COPY_AND_ASSIGN(UserCodeThreadRandomAssignPolicy); +}; + +class UserCodeThreadPool; +class UserCodeThreadWorker { +public: +UserCodeThreadWorker(UserCodeThreadPool* pool); +void UserCodeRun(UserCode&& usercode); +void UserCodeLoop(); +void Start(); +void Stop(); +void Join(); + +private: +UserCodeThreadPool* _pool; +std::deque _queue; +std::mutex _mutex; +std::condition_variable _cond; +std::thread _worker; +std::atomic _running; // running flag +static std::atomic _next_worker_id; // worker id +}; +// "user code thread pool" configuration +struct UserCodeThreadPoolConf { +UserCodeThreadPoolConf(const std::string& pool_name, + const std::string& num_threads, + const std::function& startfn, + UserCodeThreadAssignPolicy* policy) +: pool_name(pool_name), + num_threads(num_threads), + thread_startfn(startfn), + assign_policy(policy) {} +std::string pool_name; // pool name +std::string num_threads;// thread number +std::function thread_startfn; // thread start function +UserCodeThreadAssignPolicy* assign_policy; // thread assign policy +}; +// "user code thread pool" is a set of pthreads to allow run user code in this +// pool for some methods +class UserCodeThreadPool { +static double GetInPoolElapseInSecond(void*); Review Comment: private的成员是不是放在public后面 ## src/brpc/usercode_thread_pool.cpp:
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1535868157 > > > > 另外我有个问题,因为这个ThreadPool是不能拷贝的,放到FlatMap现在只能放入指针,有什么好的建议让这块更优雅一些吗? > > > > > > > > > @chenBright 这个有啥建议吗?Map里面存了裸指针,析构的时候要手动遍历调用delete,感觉不太优雅。 > > > > > > 不想自己delete,那就用智能指针吧。没有特别好的办法,看看大家有没有啥建议咯。 > > flatmap放unique_ptr好像也不行,或者不用flatmap,用std::unordered_map ? 这个问题解决了,flatmap可以存unique_ptr,gejun在前面提了个patch,可以支持了。只是insert接口这块没有很好的支持。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1526872291 > std::unordered_map也要求value支持拷贝吧。 类似这样,就可以 std::vector> _threads; // threads _threads.emplace_back(new std::thread(UserCodeRunner, this)); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
chenBright commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525594120 > std::unordered_map std::unordered_map也要求value支持拷贝吧。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525565132 > > > 另外我有个问题,因为这个ThreadPool是不能拷贝的,放到FlatMap现在只能放入指针,有什么好的建议让这块更优雅一些吗? > > > > > > @chenBright 这个有啥建议吗?Map里面存了裸指针,析构的时候要手动遍历调用delete,感觉不太优雅。 > > 不想自己delete,那就用智能指针吧。没有特别好的办法,看看大家有没有啥建议咯。 flatmap放unique_ptr好像也不行,或者不用flatmap,用std::unordered_map ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
chenBright commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525554441 > > 另外我有个问题,因为这个ThreadPool是不能拷贝的,放到FlatMap现在只能放入指针,有什么好的建议让这块更优雅一些吗? > > @chenBright 这个有啥建议吗?Map里面存了裸指针,析构的时候要手动遍历调用delete,感觉不太优雅。 不想自己delete,那就用智能指针吧。没有特别好的办法,看看大家有没有啥建议咯。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525527773 > 另外我有个问题,因为这个ThreadPool是不能拷贝的,放到FlatMap现在只能放入指针,有什么好的建议让这块更优雅一些吗? @chenBright 这个有啥建议吗?Map里面存了裸指针,析构的时候要手动遍历调用delete,感觉不太优雅。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525526114 > > > 需要考虑线程池退出问题吗? > > > > > > 这块参考了usercode backup pool的实现,它里面有个注释,所以我这块好像也得按照这个方式处理。 int UserCodeBackupPool::Init() { // Like bthread workers, these threads never quit (to avoid potential hang // during termination of program). for (int i = 0; i < FLAGS_usercode_backup_threads; ++i) { pthread_t th; if (pthread_create(&th, NULL, UserCodeRunner, this) != 0) { LOG(ERROR) << "Fail to create UserCodeRunner"; return -1; } } return 0; } > > UserCodeBackupPool是框架里使用的,进程运行时是不会析构的。 > > ```c++ > static UserCodeBackupPool* s_usercode_pool = NULL; > ``` > > 该PR里的UserCodeThreadPool是给用户使用的,UserCodeThreadPool有可能会析构,但是线程池里的线程没有退出,有crash的可能。 > > 在UserCodeThreadPool析构函数中做一下stop and join操作,是不是合理一些呢? 看代码Server退出的时候都应该调用Server::Join()这个函数,这个函数执行完,应该就没有新的请求了,然后Server析构函数中析构_thread_pool_map的时候去UserCodeThreadPool里面调用stop and join,我觉得这样是合理的。 UserCodeBackupPool如果在Server里面析构了应该会造成死锁问题,UserCodeThreadPool应该没有这种问题。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
chenBright commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525295355 > > 需要考虑线程池退出问题吗? > > 这块参考了usercode backup pool的实现,它里面有个注释,所以我这块好像也得按照这个方式处理。 int UserCodeBackupPool::Init() { // Like bthread workers, these threads never quit (to avoid potential hang // during termination of program). for (int i = 0; i < FLAGS_usercode_backup_threads; ++i) { pthread_t th; if (pthread_create(&th, NULL, UserCodeRunner, this) != 0) { LOG(ERROR) << "Fail to create UserCodeRunner"; return -1; } } return 0; } UserCodeBackupPool是框架里使用的,进程运行时是不会析构的。 ```c++ static UserCodeBackupPool* s_usercode_pool = NULL; ``` 该PR里的UserCodeThreadPool是给用户使用的,UserCodeThreadPool有可能会析构,但是线程池里的线程没有退出,有crash的可能。 在UserCodeThreadPool析构函数中做一下stop and join操作,是不是合理一些呢? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1524495658 另外我有个问题,因为这个ThreadPool是不能拷贝的,放到FlatMap现在只能放入指针,有什么好的建议让这块更优雅一些吗? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1524494146 > 需要考虑线程池退出问题吗? 这块参考了usercode backup pool的实现,它里面有个注释,所以我这块好像也得按照这个方式处理。 int UserCodeBackupPool::Init() { // Like bthread workers, these threads never quit (to avoid potential hang // during termination of program). for (int i = 0; i < FLAGS_usercode_backup_threads; ++i) { pthread_t th; if (pthread_create(&th, NULL, UserCodeRunner, this) != 0) { LOG(ERROR) << "Fail to create UserCodeRunner"; return -1; } } return 0; } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
wwbmmm commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1522947950 和主干冲突了 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
chenBright commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1522840023 需要考虑线程池退出问题吗? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org
Re: [PR] add usercode thread pool for method (brpc)
yanglimingcn commented on PR #2215: URL: https://github.com/apache/brpc/pull/2215#issuecomment-1515725562 @wwbmmm 有时间review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org