SpriCoder commented on code in PR #17800: URL: https://github.com/apache/iotdb/pull/17800#discussion_r3332838040
########## iotdb-client/client-cpp/src/main/SessionPool.cpp: ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "SessionPool.h" + +void PooledSession::reset() { + if (session_ && pool_ != nullptr) { + pool_->putBack(session_, broken_); + } + pool_ = nullptr; + session_ = nullptr; + broken_ = false; +} + +SessionPool::SessionPool(std::string host, int rpcPort, std::string username, std::string password, + size_t maxSize) + : host_(std::move(host)), rpcPort_(rpcPort), username_(std::move(username)), + password_(std::move(password)), maxSize_(maxSize) { + if (maxSize_ == 0) { + throw IoTDBException("SessionPool maxSize must be greater than 0."); + } +} + +SessionPool::SessionPool(std::vector<std::string> nodeUrls, std::string username, + std::string password, size_t maxSize) + : rpcPort_(AbstractSessionBuilder::DEFAULT_RPC_PORT), nodeUrls_(std::move(nodeUrls)), + username_(std::move(username)), password_(std::move(password)), maxSize_(maxSize) { + if (maxSize_ == 0) { + throw IoTDBException("SessionPool maxSize must be greater than 0."); + } +} + +SessionPool::~SessionPool() { + try { + close(); + } catch (const std::exception& e) { + log_debug(std::string("SessionPool::~SessionPool(), ") + e.what()); + } +} + +SessionPool& SessionPool::setFetchSize(int fetchSize) { + fetchSize_ = fetchSize; + return *this; +} + +SessionPool& SessionPool::setZoneId(std::string zoneId) { + zoneId_ = std::move(zoneId); + return *this; +} + +SessionPool& SessionPool::setSqlDialect(std::string sqlDialect) { + sqlDialect_ = std::move(sqlDialect); + return *this; +} + +SessionPool& SessionPool::setDatabase(std::string database) { + database_ = std::move(database); + return *this; +} + +SessionPool& SessionPool::setEnableRedirection(bool enable) { + enableRedirection_ = enable; + return *this; +} + +SessionPool& SessionPool::setEnableAutoFetch(bool enable) { + enableAutoFetch_ = enable; + return *this; +} + +SessionPool& SessionPool::setEnableRPCCompression(bool enable) { + enableRPCCompression_ = enable; + return *this; +} + +SessionPool& SessionPool::setConnectTimeoutMs(int connectTimeoutMs) { + connectTimeoutMs_ = connectTimeoutMs; + return *this; +} + +SessionPool& SessionPool::setWaitToGetSessionTimeoutMs(int64_t timeoutMs) { + waitTimeoutMs_ = timeoutMs; + return *this; +} + +SessionPool& SessionPool::setUseSSL(bool useSSL) { + useSSL_ = useSSL; + return *this; +} + +SessionPool& SessionPool::setTrustCertFilePath(std::string path) { + trustCertFilePath_ = std::move(path); + return *this; +} + +std::shared_ptr<Session> SessionPool::constructNewSession() { + AbstractSessionBuilder builder; + builder.host = host_; + builder.rpcPort = rpcPort_; + builder.nodeUrls = nodeUrls_; + builder.username = username_; + builder.password = password_; + builder.zoneId = zoneId_; + builder.fetchSize = fetchSize_; + builder.sqlDialect = sqlDialect_; + builder.database = database_; + builder.enableAutoFetch = enableAutoFetch_; + builder.enableRedirections = enableRedirection_; + builder.enableRPCCompression = enableRPCCompression_; + builder.connectTimeoutMs = connectTimeoutMs_; + builder.useSSL = useSSL_; + builder.trustCertFilePath = trustCertFilePath_; + + auto session = std::make_shared<Session>(&builder); + session->open(enableRPCCompression_, connectTimeoutMs_); + return session; +} + +std::shared_ptr<Session> SessionPool::acquire(int64_t timeoutMs) { + const int64_t effectiveTimeout = timeoutMs > 0 ? timeoutMs : waitTimeoutMs_; + std::unique_lock<std::mutex> lock(mutex_); + const auto deadline = + std::chrono::steady_clock::now() + std::chrono::milliseconds(effectiveTimeout); + + while (true) { + if (closed_) { + throw IoTDBException("SessionPool is closed."); + } + if (!idleQueue_.empty()) { + auto session = idleQueue_.front(); + idleQueue_.pop_front(); + return session; + } + if (size_ < maxSize_) { + // Reserve a slot, then build the connection outside the lock so other + // borrowers are not blocked by network/handshake latency. + ++size_; + lock.unlock(); + std::shared_ptr<Session> session; + try { + session = constructNewSession(); Review Comment: Good catch, that was a real race. Fixed in 7d5fa7e: after `constructNewSession()` returns I now re-acquire the lock and re-check `closed_`. If the pool was closed while the connection was being built, the slot is released (`--size_`), the session is torn down outside the lock, and `acquire()` throws `IoTDBException("SessionPool is closed.")` instead of handing it out. The reserved slot is also accounted for correctly: `close()` only subtracts the idle count, and this in-flight slot is decremented on the closed path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
