jt2594838 commented on code in PR #16062:
URL: https://github.com/apache/iotdb/pull/16062#discussion_r2244245280


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java:
##########
@@ -90,11 +93,43 @@ public synchronized void onSuccess() {
     binaryDataBases.clear();
     insertNodeDataBases.clear();
     tabletDataBases.clear();
+    tableModelTabletMap.clear();
 
     pipe2BytesAccumulated.clear();
   }
 
   public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException {
+    for (final Map.Entry<String, Map<String, Pair<Integer, List<Tablet>>>> 
insertTablets :
+        tableModelTabletMap.entrySet()) {
+      final String databaseName = insertTablets.getKey();
+      for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry :
+          insertTablets.getValue().entrySet()) {
+        Tablet batchTablet = null;
+        for (final Tablet tablet : tabletEntry.getValue().getRight()) {
+          if (Objects.isNull(batchTablet)) {
+            batchTablet = tablet;
+          } else if (!batchTablet.append(tablet, 
tabletEntry.getValue().getLeft())) {
+            throw new PipeException(
+                "Failed to merge tablets due to inconsistent schema, database: 
"
+                    + databaseName
+                    + ", tableName: "
+                    + tablet.getTableName());
+          }

Review Comment:
   You should send this tablet alone instead of throwing an exception.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java:
##########
@@ -90,11 +93,43 @@ public synchronized void onSuccess() {
     binaryDataBases.clear();
     insertNodeDataBases.clear();
     tabletDataBases.clear();
+    tableModelTabletMap.clear();
 
     pipe2BytesAccumulated.clear();
   }
 
   public PipeTransferTabletBatchReqV2 toTPipeTransferReq() throws IOException {
+    for (final Map.Entry<String, Map<String, Pair<Integer, List<Tablet>>>> 
insertTablets :
+        tableModelTabletMap.entrySet()) {
+      final String databaseName = insertTablets.getKey();
+      for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry :
+          insertTablets.getValue().entrySet()) {
+        Tablet batchTablet = null;
+        for (final Tablet tablet : tabletEntry.getValue().getRight()) {
+          if (Objects.isNull(batchTablet)) {
+            batchTablet = tablet;
+          } else if (!batchTablet.append(tablet, 
tabletEntry.getValue().getLeft())) {
+            throw new PipeException(
+                "Failed to merge tablets due to inconsistent schema, database: 
"
+                    + databaseName
+                    + ", tableName: "
+                    + tablet.getTableName());
+          }
+        }
+        assert batchTablet != null;

Review Comment:
   Do not use assert in production code.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java:
##########
@@ -52,13 +57,12 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
   private final List<String> insertNodeDataBases = new ArrayList<>();
   private final List<String> tabletDataBases = new ArrayList<>();
 
+  private final Map<String, Map<String, Pair<Integer, List<Tablet>>>> 
tableModelTabletMap =
+      new HashMap<>();

Review Comment:
   Indicate what the keys are via comment or field name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to