jt2594838 commented on code in PR #15439:
URL: https://github.com/apache/iotdb/pull/15439#discussion_r2078780125


##########
iotdb-client/client-cpp/src/main/Common.cc:
##########
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Common.h"
+
+void RpcUtils::verifySuccess(const TSStatus &status) {
+    if (status.code == TSStatusCode::MULTIPLE_ERROR) {
+        verifySuccess(status.subStatus);
+        return;
+    }
+    if (status.code != TSStatusCode::SUCCESS_STATUS
+        && status.code != TSStatusCode::REDIRECTION_RECOMMEND) {
+        throw ExecutionException(to_string(status.code) + ": " + 
status.message, status);
+    }
+}
+
+void RpcUtils::verifySuccessWithRedirection(const TSStatus &status) {
+    verifySuccess(status);
+    if (status.__isset.redirectNode) {
+        throw RedirectException(to_string(status.code) + ": " + 
status.message, status.redirectNode);
+    }
+    if (status.__isset.subStatus) {
+        auto statusSubStatus = status.subStatus;
+        vector<TEndPoint> endPointList(statusSubStatus.size());
+        int count = 0;
+        for (TSStatus subStatus : statusSubStatus) {
+            if (subStatus.__isset.redirectNode) {
+                endPointList[count++] = subStatus.redirectNode;
+            } else {
+                TEndPoint endPoint;
+                endPointList[count++] = endPoint;
+            }
+        }
+        if (!endPointList.empty()) {
+            throw RedirectException(to_string(status.code) + ": " + 
status.message, endPointList);
+        }
+    }
+}
+
+void RpcUtils::verifySuccessWithRedirectionForMultiDevices(const TSStatus 
&status, vector<string> devices) {
+    verifySuccess(status);
+
+    if (status.code == TSStatusCode::MULTIPLE_ERROR
+    || status.code == TSStatusCode::REDIRECTION_RECOMMEND) {
+        map<string, TEndPoint> deviceEndPointMap;
+        vector<TSStatus> statusSubStatus;
+        for (int i = 0; i < statusSubStatus.size(); i++) {
+            TSStatus subStatus = statusSubStatus[i];
+            if (subStatus.__isset.redirectNode) {
+                deviceEndPointMap.insert(make_pair(devices[i], 
subStatus.redirectNode));
+            }
+        }
+        throw RedirectException(to_string(status.code) + ": " + 
status.message, deviceEndPointMap);
+    }
+
+    if (status.__isset.redirectNode) {
+        throw RedirectException(to_string(status.code) + ": " + 
status.message, status.redirectNode);
+    }
+    if (status.__isset.subStatus) {
+        auto statusSubStatus = status.subStatus;
+        vector<TEndPoint> endPointList(statusSubStatus.size());
+        int count = 0;
+        for (TSStatus subStatus : statusSubStatus) {
+            if (subStatus.__isset.redirectNode) {
+                endPointList[count++] = subStatus.redirectNode;
+            } else {
+                TEndPoint endPoint;
+                endPointList[count++] = endPoint;
+            }
+        }
+        if (!endPointList.empty()) {
+            throw RedirectException(to_string(status.code) + ": " + 
status.message, endPointList);
+        }
+    }
+}
+
+void RpcUtils::verifySuccess(const vector<TSStatus> &statuses) {
+    for (const TSStatus &status: statuses) {
+        if (status.code != TSStatusCode::SUCCESS_STATUS) {
+            throw BatchExecutionException(status.message, statuses);
+        }
+    }
+}
+
+TSStatus RpcUtils::getStatus(TSStatusCode::TSStatusCode tsStatusCode) {
+    TSStatus status;
+    status.__set_code(tsStatusCode);
+    return status;
+}
+
+TSStatus RpcUtils::getStatus(int code, const string &message) {
+    TSStatus status;
+    status.__set_code(code);
+    status.__set_message(message);
+    return status;
+}
+
+shared_ptr<TSExecuteStatementResp> 
RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode) {
+    TSStatus status = getStatus(tsStatusCode);
+    return getTSExecuteStatementResp(status);
+}
+
+shared_ptr<TSExecuteStatementResp>
+RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, 
const string &message) {
+    TSStatus status = getStatus(tsStatusCode, message);
+    return getTSExecuteStatementResp(status);
+}
+
+shared_ptr<TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(const 
TSStatus &status) {
+    shared_ptr<TSExecuteStatementResp> resp(new TSExecuteStatementResp());
+    TSStatus tsStatus(status);
+    resp->status = tsStatus;
+    return resp;
+}

Review Comment:
   Is it possible to just write 
   `resp->_set_status(status)`
   which avoids one temperary variable.



##########
iotdb-client/client-cpp/src/main/Session.cpp:
##########
@@ -255,7 +167,21 @@ void Tablet::setAligned(bool isAligned) {
     this->isAligned = isAligned;
 }
 
-string SessionUtils::getTime(const Tablet &tablet) {
+std::shared_ptr<storage::IDeviceID> Tablet::getDeviceID(int row) {
+    std::vector<std::string> id_array(idColumnIndexes.size() + 1);
+    size_t idArrayIdx = 0;
+    id_array[idArrayIdx++] = this->deviceId;
+    for (auto idColumnIndex : idColumnIndexes) {
+        void* strPtr = getValue(idColumnIndex, row, TSDataType::TEXT);
+        if (!strPtr) {
+            throw std::runtime_error("Unsupported data type: " + 
std::to_string(TSDataType::TEXT));
+        }
+        id_array[idArrayIdx++] = *static_cast<std::string*>(strPtr);

Review Comment:
   Check this, a segment in a DeviceId can be null (if it is not at tail), like 
("table1", null, "tag2").
   And the exception message is not  consistent with the error.



##########
iotdb-client/client-cpp/src/main/Session.cpp:
##########
@@ -942,325 +955,273 @@ void Session::close() {
         return;
     }
     isClosed = true;
-
-    bool needThrowException = false;
-    string errMsg;
-    try {
-        TSCloseSessionReq req;
-        req.__set_sessionId(sessionId);
-        TSStatus tsStatus;
-        client->closeSession(tsStatus, req);
-    } catch (const TTransportException &e) {
-        log_debug(e.what());
-        throw IoTDBConnectionException(e.what());
-    } catch (const exception &e) {
-        log_debug(e.what());
-        errMsg = errMsg + "Session::close() client->closeSession() error, 
maybe remote server is down. " + e.what() + "\n" ;
-        needThrowException = true;
-    }
-
-    try {
-        if (transport->isOpen()) {
-            transport->close();
-        }
-    }
-    catch (const exception &e) {
-        log_debug(e.what());
-        errMsg = errMsg + "Session::close() transport->close() error. " + 
e.what() + "\n" ;
-        needThrowException = true;
-    }
-
-    if (needThrowException) {
-        throw IoTDBException(errMsg);
-    }
 }
 
 
-void Session::insertRecord(const string &deviceId, int64_t time,
-                           const vector<string> &measurements,
-                           const vector<string> &values) {
+void Session::insertRecord(const string& deviceId, int64_t time,
+                           const vector<string>& measurements,
+                           const vector<string>& values) {
     TSInsertStringRecordReq req;
-    req.__set_sessionId(sessionId);
     req.__set_prefixPath(deviceId);
     req.__set_timestamp(time);
     req.__set_measurements(measurements);
     req.__set_values(values);
     req.__set_isAligned(false);
-    TSStatus respStatus;
     try {
-        
getSessionConnection(deviceId)->getSessionClient()->insertStringRecord(respStatus,
 req);
-        RpcUtils::verifySuccess(respStatus);
-    } catch (RedirectException& e) {
+        getSessionConnection(deviceId)->insertStringRecord(req);
+    }
+    catch (RedirectException& e) {
         handleRedirection(deviceId, e.endPoint);
-    } catch (const IoTDBConnectionException &e) {
+    } catch (const IoTDBConnectionException& e) {
         if (enableRedirection && deviceIdToEndpoint.find(deviceId) != 
deviceIdToEndpoint.end()) {
             deviceIdToEndpoint.erase(deviceId);
             try {
-                
defaultSessionConnection->getSessionClient()->insertStringRecord(respStatus, 
req);
-            } catch (RedirectException& e) {
+                defaultSessionConnection->insertStringRecord(req);
+            }
+            catch (RedirectException& e) {
             }
-        } else {
-            throw IoTDBConnectionException(e.what());
         }
-    } catch (const TTransportException &e) {
-        log_debug(e.what());
-        throw IoTDBConnectionException(e.what());
-    } catch (const IoTDBException &e) {
-        log_debug(e.what());
-        throw;
-    } catch (const exception &e) {
-        log_debug(e.what());
-        throw IoTDBException(e.what());
+        else {
+            throw e;
+        }
     }
 }
 
-void Session::insertRecord(const string &prefixPath, int64_t time,
-                           const vector<string> &measurements,
-                           const vector<TSDataType::TSDataType> &types,
-                           const vector<char *> &values) {
+void Session::insertRecord(const string& deviceId, int64_t time,
+                           const vector<string>& measurements,
+                           const vector<TSDataType::TSDataType>& types,
+                           const vector<char*>& values) {
     TSInsertRecordReq req;
-    req.__set_sessionId(sessionId);
-    req.__set_prefixPath(prefixPath);
+    req.__set_prefixPath(deviceId);
     req.__set_timestamp(time);
     req.__set_measurements(measurements);
     string buffer;
     putValuesIntoBuffer(types, values, buffer);
     req.__set_values(buffer);
     req.__set_isAligned(false);
-    TSStatus respStatus;
     try {
-        client->insertRecord(respStatus, req);
-        RpcUtils::verifySuccess(respStatus);
-    } catch (const TTransportException &e) {
-        log_debug(e.what());
-        throw IoTDBConnectionException(e.what());
-    } catch (const IoTDBException &e) {
-        log_debug(e.what());
-        throw;
-    } catch (const exception &e) {
-        log_debug(e.what());
-        throw IoTDBException(e.what());
+        getSessionConnection(deviceId)->insertRecord(req);
+    }
+    catch (RedirectException& e) {
+        handleRedirection(deviceId, e.endPoint);
+    } catch (const IoTDBConnectionException& e) {
+        if (enableRedirection && deviceIdToEndpoint.find(deviceId) != 
deviceIdToEndpoint.end()) {
+            deviceIdToEndpoint.erase(deviceId);
+            try {
+                defaultSessionConnection->insertRecord(req);
+            }
+            catch (RedirectException& e) {
+            }

Review Comment:
   Should not handle this?



##########
iotdb-client/client-cpp/src/main/Session.cpp:
##########
@@ -833,16 +802,122 @@ void Session::initNodesSupplier() {
     endPoints.emplace_back(endPoint);
     if (enableAutoFetch) {
         nodesSupplier = NodesSupplier::create(endPoints, username, password);
-    } else {
+    }
+    else {
         nodesSupplier = make_shared<StaticNodesSupplier>(endPoints);
     }
 }
 
 void Session::initDefaultSessionConnection() {
     defaultEndPoint.__set_ip(host);
     defaultEndPoint.__set_port(rpcPort);
-    defaultSessionConnection = make_shared<SessionConnection>(this, 
defaultEndPoint, zoneId, nodesSupplier, 60, 500,
-            sqlDialect, database);
+    defaultSessionConnection = make_shared<SessionConnection>(this, 
defaultEndPoint, zoneId, nodesSupplier, fetchSize,
+                                                              60, 500,
+                                                              sqlDialect, 
database);
+}
+
+void Session::insertStringRecordsWithLeaderCache(vector<string> deviceIds, 
vector<int64_t> times,
+                                                 vector<vector<string>> 
measurementsList,
+                                                 vector<vector<string>> 
valuesList, bool isAligned) {
+    std::unordered_map<std::shared_ptr<SessionConnection>, 
TSInsertStringRecordsReq> recordsGroup;
+    for (int i = 0; i < deviceIds.size(); i++) {
+        auto connection = getSessionConnection(deviceIds[i]);
+        TSInsertStringRecordsReq request;
+        request.__set_sessionId(connection->sessionId);
+        request.__set_prefixPaths(deviceIds);
+        request.__set_timestamps(times);
+        request.__set_measurementsList(measurementsList);
+        request.__set_valuesList(valuesList);
+        request.__set_isAligned(isAligned);
+        recordsGroup.insert(make_pair(connection, request));
+    }
+    std::function<void(std::shared_ptr<SessionConnection>, const 
TSInsertStringRecordsReq&)> consumer =
+        [](const std::shared_ptr<SessionConnection>& c, const 
TSInsertStringRecordsReq& r) {
+        c->insertStringRecords(r);
+    };
+    if (recordsGroup.size() == 1) {
+        insertOnce(recordsGroup, consumer);
+    }
+    else {
+        insertByGroup(recordsGroup, consumer);
+    }
+}
+
+void Session::insertRecordsWithLeaderCache(vector<string> deviceIds, 
vector<int64_t> times,
+                                           vector<vector<string>> 
measurementsList,
+                                           const 
vector<vector<TSDataType::TSDataType>>& typesList,
+                                           vector<vector<char*>> valuesList, 
bool isAligned) {
+    std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertRecordsReq> 
recordsGroup;
+    for (int i = 0; i < deviceIds.size(); i++) {
+        auto connection = getSessionConnection(deviceIds[i]);
+        TSInsertRecordsReq request;
+        request.__set_prefixPaths(deviceIds);
+        request.__set_timestamps(times);
+        request.__set_measurementsList(measurementsList);
+        vector<string> bufferList;
+        for (size_t i = 0; i < valuesList.size(); i++) {
+            string buffer;
+            putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
+            bufferList.push_back(buffer);
+        }
+        request.__set_valuesList(bufferList);
+        request.__set_isAligned(false);
+        recordsGroup.insert(make_pair(connection, request));
+    }
+    std::function<void(std::shared_ptr<SessionConnection>, const 
TSInsertRecordsReq&)> consumer =
+        [](const std::shared_ptr<SessionConnection>& c, const 
TSInsertRecordsReq& r) {
+        c->insertRecords(r);
+    };
+    if (recordsGroup.size() == 1) {
+        insertOnce(recordsGroup, consumer);
+    }
+    else {
+        insertByGroup(recordsGroup, consumer);
+    }
+}
+
+void Session::insertTabletsWithLeaderCache(unordered_map<string, Tablet*> 
tablets, bool sorted, bool isAligned) {
+    std::unordered_map<std::shared_ptr<SessionConnection>, TSInsertTabletsReq> 
tabletGroup;
+    if (tablets.empty()) {
+        throw BatchExecutionException("No tablet is inserting!");
+    }
+    auto beginIter = tablets.begin();
+    bool isFirstTabletAligned = ((*beginIter).second)->isAligned;
+    for (const auto& item : tablets) {
+        TSInsertTabletsReq request;
+        if (isFirstTabletAligned != item.second->isAligned) {
+            throw BatchExecutionException("The tablets should be all aligned 
or non-aligned!");
+        }
+        if (!checkSorted(*(item.second))) {
+            sortTablet(*(item.second));
+        }
+        request.prefixPaths.push_back(item.second->deviceId);
+        vector<string> measurements;
+        vector<int> dataTypes;
+        for (pair<string, TSDataType::TSDataType> schema : 
item.second->schemas) {
+            measurements.push_back(schema.first);
+            dataTypes.push_back(schema.second);
+        }
+        request.measurementsList.push_back(measurements);
+        request.typesList.push_back(dataTypes);
+        
request.timestampsList.push_back(move(SessionUtils::getTime(*(item.second))));
+        
request.valuesList.push_back(move(SessionUtils::getValue(*(item.second))));
+        request.sizeList.push_back(item.second->rowSize);
+        request.__set_isAligned(item.second->isAligned);
+        auto connection = getSessionConnection(item.first);
+        tabletGroup.insert(make_pair(connection, request));
+    }

Review Comment:
   If two tablets belong to the same connection, the request will be 
overwritten?
   Maybe `request` should be retrieved from tabletGroup first, if any.



##########
iotdb-client/client-cpp/src/main/Session.cpp:
##########
@@ -46,162 +47,73 @@ TSDataType::TSDataType getTSDataTypeFromString(const 
string &str) {
     return TSDataType::INVALID_DATATYPE;
 }
 
-void RpcUtils::verifySuccess(const TSStatus &status) {
-    if (status.code == TSStatusCode::MULTIPLE_ERROR) {
-        verifySuccess(status.subStatus);
-        return;
-    }
-    if (status.code != TSStatusCode::SUCCESS_STATUS
-        && status.code != TSStatusCode::REDIRECTION_RECOMMEND) {
-        throw ExecutionException(to_string(status.code) + ": " + 
status.message, status);
-    }
-}
-
-void RpcUtils::verifySuccessWithRedirection(const TSStatus &status) {
-    verifySuccess(status);
-    if (status.__isset.redirectNode) {
-        throw RedirectException(to_string(status.code) + ": " + 
status.message, status.redirectNode);
-    }
-    if (status.__isset.subStatus) {
-        auto statusSubStatus = status.subStatus;
-        vector<TEndPoint> endPointList(statusSubStatus.size());
-        int count = 0;
-        for (TSStatus subStatus : statusSubStatus) {
-            if (subStatus.__isset.redirectNode) {
-                endPointList[count++] = subStatus.redirectNode;
-            } else {
-                TEndPoint endPoint;
-                endPointList[count++] = endPoint;
-            }
-        }
-        if (!endPointList.empty()) {
-            throw RedirectException(to_string(status.code) + ": " + 
status.message, endPointList);
-        }
-    }
-}
-
-void RpcUtils::verifySuccess(const vector<TSStatus> &statuses) {
-    for (const TSStatus &status: statuses) {
-        if (status.code != TSStatusCode::SUCCESS_STATUS) {
-            throw BatchExecutionException(status.message, statuses);
-        }
-    }
-}
-
-TSStatus RpcUtils::getStatus(TSStatusCode::TSStatusCode tsStatusCode) {
-    TSStatus status;
-    status.__set_code(tsStatusCode);
-    return status;
-}
-
-TSStatus RpcUtils::getStatus(int code, const string &message) {
-    TSStatus status;
-    status.__set_code(code);
-    status.__set_message(message);
-    return status;
-}
-
-shared_ptr<TSExecuteStatementResp> 
RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode) {
-    TSStatus status = getStatus(tsStatusCode);
-    return getTSExecuteStatementResp(status);
-}
-
-shared_ptr<TSExecuteStatementResp>
-RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, 
const string &message) {
-    TSStatus status = getStatus(tsStatusCode, message);
-    return getTSExecuteStatementResp(status);
-}
-
-shared_ptr<TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(const 
TSStatus &status) {
-    shared_ptr<TSExecuteStatementResp> resp(new TSExecuteStatementResp());
-    TSStatus tsStatus(status);
-    resp->status = tsStatus;
-    return resp;
-}
-
-shared_ptr<TSFetchResultsResp> 
RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode) {
-    TSStatus status = getStatus(tsStatusCode);
-    return getTSFetchResultsResp(status);
-}
-
-shared_ptr<TSFetchResultsResp>
-RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, const 
string &appendMessage) {
-    TSStatus status = getStatus(tsStatusCode, appendMessage);
-    return getTSFetchResultsResp(status);
-}
-
-shared_ptr<TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(const TSStatus 
&status) {
-    shared_ptr<TSFetchResultsResp> resp(new TSFetchResultsResp());
-    TSStatus tsStatus(status);
-    resp->__set_status(tsStatus);
-    return resp;
-}
-
 void Tablet::createColumns() {
     for (size_t i = 0; i < schemas.size(); i++) {
         TSDataType::TSDataType dataType = schemas[i].second;
         switch (dataType) {
-            case TSDataType::BOOLEAN:
-                values[i] = new bool[maxRowNumber];
-                break;
-            case TSDataType::INT32:
-                values[i] = new int[maxRowNumber];
-                break;
-            case TSDataType::INT64:
-                values[i] = new int64_t[maxRowNumber];
-                break;
-            case TSDataType::FLOAT:
-                values[i] = new float[maxRowNumber];
-                break;
-            case TSDataType::DOUBLE:
-                values[i] = new double[maxRowNumber];
-                break;
-            case TSDataType::TEXT:
-                values[i] = new string[maxRowNumber];
-                break;
-            default:
-                throw UnSupportedDataTypeException(string("Data type ") + 
to_string(dataType) + " is not supported.");
+        case TSDataType::BOOLEAN:
+            values[i] = new bool[maxRowNumber];
+            break;
+        case TSDataType::INT32:
+            values[i] = new int[maxRowNumber];
+            break;
+        case TSDataType::INT64:
+            values[i] = new int64_t[maxRowNumber];
+            break;
+        case TSDataType::FLOAT:
+            values[i] = new float[maxRowNumber];
+            break;
+        case TSDataType::DOUBLE:
+            values[i] = new double[maxRowNumber];
+            break;
+        case TSDataType::TEXT:
+            values[i] = new string[maxRowNumber];
+            break;
+        default:
+            throw UnSupportedDataTypeException(string("Data type ") + 
to_string(dataType) + " is not supported.");
         }
     }
 }
 
 void Tablet::deleteColumns() {
     for (size_t i = 0; i < schemas.size(); i++) {
+        if (values[i]) continue;

Review Comment:
   Should be `!values[i]`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to