VGalaxies commented on code in PR #15276:
URL: https://github.com/apache/iotdb/pull/15276#discussion_r2031370588


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java:
##########
@@ -91,178 +105,201 @@ public synchronized void close() {
     isTabletAlignedList.clear();
   }
 
-  private List<Pair<String, File>> writeTabletsToTsFiles()
-      throws IOException, WriteProcessException {
-    final Map<String, List<Tablet>> device2Tablets = new HashMap<>();
-    final Map<String, Boolean> device2Aligned = new HashMap<>();
+  private List<Pair<String, File>> writeTabletsToTsFiles() throws 
WriteProcessException {
+    final List<Pair<String, File>> sealedFiles = new ArrayList<>();
+    try (final RestorableTsFileIOWriter writer = new 
RestorableTsFileIOWriter(createFile())) {
+      writeTabletsIntoOneFile(writer);
+      sealedFiles.add(new Pair<>(null, writer.getFile()));
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Batch id = {}: Failed to write tablets into tsfile, because {}",
+          currentBatchId.get(),
+          e.getMessage(),
+          e);
+      // TODO: handle ex
+      throw new WriteProcessException(e);
+    }
+
+    return sealedFiles;
+  }
+
+  private void writeTabletsIntoOneFile(final RestorableTsFileIOWriter writer) 
throws Exception {
+    // TODO: database & region
+    final String database = "test_db";
+    final String dataRegionId = "test_dr";
+    final IMemTable memTable = new PrimitiveMemTable(database, dataRegionId);
 
-    // Sort the tablets by device id
     for (int i = 0, size = tabletList.size(); i < size; ++i) {
       final Tablet tablet = tabletList.get(i);
-      final String deviceId = tablet.getDeviceId();
-      device2Tablets.computeIfAbsent(deviceId, k -> new 
ArrayList<>()).add(tablet);
-      device2Aligned.put(deviceId, isTabletAlignedList.get(i));
-    }
 
-    // Sort the tablets by start time in each device
-    for (final List<Tablet> tablets : device2Tablets.values()) {
-      tablets.sort(
-          // Each tablet has at least one timestamp
-          Comparator.comparingLong(tablet -> tablet.getTimestamp(0)));
-    }
+      // convert date value to int
+      // refer to
+      // 
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
+      final Object[] values = tablet.getValues();
+      for (int j = 0; j < tablet.getSchemas().size(); ++j) {
+        if (Objects.equals(TSDataType.DATE, 
tablet.getSchemas().get(j).getType())) {
+          final LocalDate[] dates = ((LocalDate[]) values[j]);
+          final int[] dateValues = new int[dates.length];
+          for (int k = 0; k < Math.min(dates.length, tablet.getRowSize()); 
k++) {
+            dateValues[k] = DateUtils.parseDateExpressionToInt(dates[k]);
+          }
+          values[j] = dateValues;
+        }
+      }
 
-    // Sort the devices by device id
-    final List<String> devices = new ArrayList<>(device2Tablets.keySet());
-    devices.sort(Comparator.naturalOrder());
+      final InsertTabletNode insertTabletNode =
+          new InsertTabletNode(
+              // TODO: plan node id
+              new PlanNodeId("test_id"),
+              new PartialPath(tablet.getDeviceId()),
+              isTabletAlignedList.get(i),
+              tablet.getSchemas().stream()
+                  .map(IMeasurementSchema::getMeasurementName)
+                  .toArray(String[]::new),
+              tablet.getSchemas().stream()
+                  .map(IMeasurementSchema::getType)
+                  .toArray(TSDataType[]::new),
+              // TODO: cast
+              tablet.getSchemas().stream()
+                  .map(schema -> (MeasurementSchema) schema)
+                  .toArray(MeasurementSchema[]::new),
+              tablet.getTimestamps(),
+              tablet.getBitMaps(),
+              tablet.getValues(),
+              tablet.getRowSize());
+
+      // TODO: unused results
+      final TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
+      Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+
+      final int loc = insertTabletNode.checkTTL(results, 
getTTL(insertTabletNode));
+      final List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
+          insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount());
+      int start = loc;

Review Comment:
   The code here completely refers to the method call chain of 
`org.apache.iotdb.db.storageengine.dataregion.DataRegion#insertTablet`. Some 
pruning has already been done, pending discussion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to