hongzhi-gao commented on code in PR #17800: URL: https://github.com/apache/iotdb/pull/17800#discussion_r3332508395
########## iotdb-client/client-cpp/src/main/SessionPool.cpp: ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "SessionPool.h" + +void PooledSession::reset() { + if (session_ && pool_ != nullptr) { + pool_->putBack(session_, broken_); + } + pool_ = nullptr; + session_ = nullptr; + broken_ = false; +} + +SessionPool::SessionPool(std::string host, int rpcPort, std::string username, std::string password, + size_t maxSize) + : host_(std::move(host)), rpcPort_(rpcPort), username_(std::move(username)), + password_(std::move(password)), maxSize_(maxSize) { + if (maxSize_ == 0) { + throw IoTDBException("SessionPool maxSize must be greater than 0."); + } +} + +SessionPool::SessionPool(std::vector<std::string> nodeUrls, std::string username, + std::string password, size_t maxSize) + : rpcPort_(AbstractSessionBuilder::DEFAULT_RPC_PORT), nodeUrls_(std::move(nodeUrls)), + username_(std::move(username)), password_(std::move(password)), maxSize_(maxSize) { + if (maxSize_ == 0) { + throw IoTDBException("SessionPool maxSize must be greater than 0."); + } +} + +SessionPool::~SessionPool() { + try { + close(); + } catch (const std::exception& e) { + log_debug(std::string("SessionPool::~SessionPool(), ") + e.what()); + } +} + +SessionPool& SessionPool::setFetchSize(int fetchSize) { + fetchSize_ = fetchSize; + return *this; +} + +SessionPool& SessionPool::setZoneId(std::string zoneId) { + zoneId_ = std::move(zoneId); + return *this; +} + +SessionPool& SessionPool::setSqlDialect(std::string sqlDialect) { + sqlDialect_ = std::move(sqlDialect); + return *this; +} + +SessionPool& SessionPool::setDatabase(std::string database) { + database_ = std::move(database); + return *this; +} + +SessionPool& SessionPool::setEnableRedirection(bool enable) { + enableRedirection_ = enable; + return *this; +} + +SessionPool& SessionPool::setEnableAutoFetch(bool enable) { + enableAutoFetch_ = enable; + return *this; +} + +SessionPool& SessionPool::setEnableRPCCompression(bool enable) { + enableRPCCompression_ = enable; + return *this; +} + +SessionPool& SessionPool::setConnectTimeoutMs(int connectTimeoutMs) { + connectTimeoutMs_ = connectTimeoutMs; + return *this; +} + +SessionPool& SessionPool::setWaitToGetSessionTimeoutMs(int64_t timeoutMs) { + waitTimeoutMs_ = timeoutMs; + return *this; +} + +SessionPool& SessionPool::setUseSSL(bool useSSL) { + useSSL_ = useSSL; + return *this; +} + +SessionPool& SessionPool::setTrustCertFilePath(std::string path) { + trustCertFilePath_ = std::move(path); + return *this; +} + +std::shared_ptr<Session> SessionPool::constructNewSession() { + AbstractSessionBuilder builder; + builder.host = host_; + builder.rpcPort = rpcPort_; + builder.nodeUrls = nodeUrls_; + builder.username = username_; + builder.password = password_; + builder.zoneId = zoneId_; + builder.fetchSize = fetchSize_; + builder.sqlDialect = sqlDialect_; + builder.database = database_; + builder.enableAutoFetch = enableAutoFetch_; + builder.enableRedirections = enableRedirection_; + builder.enableRPCCompression = enableRPCCompression_; + builder.connectTimeoutMs = connectTimeoutMs_; + builder.useSSL = useSSL_; + builder.trustCertFilePath = trustCertFilePath_; + + auto session = std::make_shared<Session>(&builder); + session->open(enableRPCCompression_, connectTimeoutMs_); + return session; +} + +std::shared_ptr<Session> SessionPool::acquire(int64_t timeoutMs) { + const int64_t effectiveTimeout = timeoutMs > 0 ? timeoutMs : waitTimeoutMs_; + std::unique_lock<std::mutex> lock(mutex_); + const auto deadline = + std::chrono::steady_clock::now() + std::chrono::milliseconds(effectiveTimeout); + + while (true) { + if (closed_) { + throw IoTDBException("SessionPool is closed."); + } + if (!idleQueue_.empty()) { + auto session = idleQueue_.front(); + idleQueue_.pop_front(); + return session; + } + if (size_ < maxSize_) { + // Reserve a slot, then build the connection outside the lock so other + // borrowers are not blocked by network/handshake latency. + ++size_; + lock.unlock(); + std::shared_ptr<Session> session; + try { + session = constructNewSession(); Review Comment: If another thread calls close() while that connection is being built, closed_ becomes true, but the waiter still hands out a brand-new session after open() completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
