HTHou commented on code in PR #13550:
URL: https://github.com/apache/iotdb/pull/13550#discussion_r1776295219
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -2697,6 +2790,111 @@ public void insertRelationalTablet(Tablet tablet)
insertRelationalTablet(tablet, false);
}
+ private void insertRelationalTabletWithLeaderCache(Tablet tablet)
+ throws IoTDBConnectionException, StatementExecutionException {
+ Map<SessionConnection, Tablet> relationalTabletGroup = new HashMap<>();
+ if (tableModelDeviceIdToEndpoint.isEmpty()) {
+ relationalTabletGroup.put(defaultSessionConnection, tablet);
+ } else {
+ for (int i = 0; i < tablet.rowSize; i++) {
+ IDeviceID iDeviceID = tablet.getDeviceID(i);
+ final SessionConnection connection = getSessionConnection(iDeviceID);
+ int finalI = i;
+ relationalTabletGroup.compute(
+ connection,
+ (k, v) -> {
+ if (v == null) {
+ v =
+ new Tablet(
+ tablet.getTableName(),
+ tablet.getSchemas(),
+ tablet.getColumnTypes(),
+ tablet.rowSize);
+ }
+ for (int j = 0; j < v.getSchemas().size(); j++) {
+ v.addValue(
+ v.getSchemas().get(j).getMeasurementId(),
+ v.rowSize,
+ tablet.getValue(finalI, j));
+ }
+ v.addTimestamp(v.rowSize, tablet.timestamps[finalI]);
+ v.rowSize++;
+ return v;
+ });
+ }
+ }
+ insertRelationalTabletByGroup(relationalTabletGroup);
+ }
+
+ @SuppressWarnings({
+ "squid:S3776"
+ }) // ignore Cognitive Complexity of methods should not be too high
+ private <T> void insertRelationalTabletByGroup(
+ Map<SessionConnection, Tablet> relationalTabletGroup)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<CompletableFuture<Void>> completableFutures =
+ relationalTabletGroup.entrySet().stream()
+ .map(
+ entry -> {
+ SessionConnection connection = entry.getKey();
+ Tablet subTablet = entry.getValue();
+ return CompletableFuture.runAsync(
+ () -> {
+ TSInsertTabletReq request =
genTSInsertTabletReq(subTablet, false, false);
+ request.setWriteToTable(true);
+ request.setColumnCategories(
+ subTablet.getColumnTypes().stream()
+ .map(t -> (byte) t.ordinal())
+ .collect(Collectors.toList()));
+ InsertConsumer<TSInsertTabletReq> insertConsumer =
+ SessionConnection::insertTablet;
+ try {
+ insertConsumer.insert(connection, request);
+ } catch (RedirectException e) {
+ List<TEndPoint> endPointList = e.getEndPointList();
+ Map<IDeviceID, TEndPoint> endPointMap = new
HashMap<>();
+ for (int i = 0; i < endPointList.size(); i++) {
+ if (endPointList.get(i) != null) {
+ endPointMap.put(subTablet.getDeviceID(i),
endPointList.get(i));
+ }
+ }
+ endPointMap.forEach(this::handleRedirection);
+ } catch (StatementExecutionException e) {
+ throw new CompletionException(e);
+ } catch (IoTDBConnectionException e) {
+ // remove the broken session
+ removeBrokenSessionConnection(connection);
+ try {
+ insertConsumer.insert(defaultSessionConnection,
request);
+ } catch (IoTDBConnectionException |
StatementExecutionException ex) {
+ throw new CompletionException(ex);
+ } catch (RedirectException ignored) {
+ }
+ }
+ },
+ OPERATION_EXECUTOR);
+ })
+ .collect(Collectors.toList());
+
+ StringBuilder errMsgBuilder = new StringBuilder();
+ for (CompletableFuture<Void> completableFuture : completableFutures) {
+ try {
+ completableFuture.join();
+ } catch (CompletionException completionException) {
+ Throwable cause = completionException.getCause();
+ logger.error("Meet error when async insert!", cause);
+ if (cause instanceof IoTDBConnectionException) {
+ throw (IoTDBConnectionException) cause;
+ } else {
+ errMsgBuilder.append(cause.getMessage());
Review Comment:
Makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]