luoluoyuyu commented on code in PR #15276:
URL: https://github.com/apache/iotdb/pull/15276#discussion_r2030931373


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java:
##########
@@ -91,178 +105,201 @@ public synchronized void close() {
     isTabletAlignedList.clear();
   }
 
-  private List<Pair<String, File>> writeTabletsToTsFiles()
-      throws IOException, WriteProcessException {
-    final Map<String, List<Tablet>> device2Tablets = new HashMap<>();
-    final Map<String, Boolean> device2Aligned = new HashMap<>();
+  private List<Pair<String, File>> writeTabletsToTsFiles() throws 
WriteProcessException {
+    final List<Pair<String, File>> sealedFiles = new ArrayList<>();
+    try (final RestorableTsFileIOWriter writer = new 
RestorableTsFileIOWriter(createFile())) {
+      writeTabletsIntoOneFile(writer);
+      sealedFiles.add(new Pair<>(null, writer.getFile()));
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Batch id = {}: Failed to write tablets into tsfile, because {}",
+          currentBatchId.get(),
+          e.getMessage(),
+          e);
+      // TODO: handle ex
+      throw new WriteProcessException(e);
+    }
+
+    return sealedFiles;
+  }
+
+  private void writeTabletsIntoOneFile(final RestorableTsFileIOWriter writer) 
throws Exception {
+    // TODO: database & region
+    final String database = "test_db";
+    final String dataRegionId = "test_dr";
+    final IMemTable memTable = new PrimitiveMemTable(database, dataRegionId);
 
-    // Sort the tablets by device id
     for (int i = 0, size = tabletList.size(); i < size; ++i) {
       final Tablet tablet = tabletList.get(i);
-      final String deviceId = tablet.getDeviceId();
-      device2Tablets.computeIfAbsent(deviceId, k -> new 
ArrayList<>()).add(tablet);
-      device2Aligned.put(deviceId, isTabletAlignedList.get(i));
-    }
 
-    // Sort the tablets by start time in each device
-    for (final List<Tablet> tablets : device2Tablets.values()) {
-      tablets.sort(
-          // Each tablet has at least one timestamp
-          Comparator.comparingLong(tablet -> tablet.getTimestamp(0)));
-    }
+      // convert date value to int
+      // refer to
+      // 
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
+      final Object[] values = tablet.getValues();
+      for (int j = 0; j < tablet.getSchemas().size(); ++j) {
+        if (Objects.equals(TSDataType.DATE, 
tablet.getSchemas().get(j).getType())) {
+          final LocalDate[] dates = ((LocalDate[]) values[j]);
+          final int[] dateValues = new int[dates.length];
+          for (int k = 0; k < Math.min(dates.length, tablet.getRowSize()); 
k++) {
+            dateValues[k] = DateUtils.parseDateExpressionToInt(dates[k]);
+          }
+          values[j] = dateValues;
+        }
+      }
 
-    // Sort the devices by device id
-    final List<String> devices = new ArrayList<>(device2Tablets.keySet());
-    devices.sort(Comparator.naturalOrder());
+      final InsertTabletNode insertTabletNode =
+          new InsertTabletNode(
+              // TODO: plan node id
+              new PlanNodeId("test_id"),
+              new PartialPath(tablet.getDeviceId()),
+              isTabletAlignedList.get(i),
+              tablet.getSchemas().stream()
+                  .map(IMeasurementSchema::getMeasurementName)
+                  .toArray(String[]::new),
+              tablet.getSchemas().stream()
+                  .map(IMeasurementSchema::getType)
+                  .toArray(TSDataType[]::new),
+              // TODO: cast
+              tablet.getSchemas().stream()
+                  .map(schema -> (MeasurementSchema) schema)
+                  .toArray(MeasurementSchema[]::new),
+              tablet.getTimestamps(),
+              tablet.getBitMaps(),
+              tablet.getValues(),
+              tablet.getRowSize());
+
+      // TODO: unused results
+      final TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
+      Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
+
+      final int loc = insertTabletNode.checkTTL(results, 
getTTL(insertTabletNode));
+      final List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
+          insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount());
+      int start = loc;
+      final Map<Long, List<int[]>[]> splitInfo = new HashMap<>();
+      for (final Pair<IDeviceID, Integer> deviceEndOffsetPair : 
deviceEndOffsetPairs) {
+        final int end = deviceEndOffsetPair.getRight();
+        split(insertTabletNode, start, end, splitInfo);
+        start = end;
+      }
 
-    // Replace ArrayList with LinkedList to improve performance
-    final LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList =
-        new LinkedHashMap<>();
-    for (final String device : devices) {
-      device2TabletsLinkedList.put(device, new 
LinkedList<>(device2Tablets.get(device)));
+      doInsert(insertTabletNode, splitInfo, results, memTable);
     }
 
-    // Help GC
-    devices.clear();
-    device2Tablets.clear();
+    final MemTableFlushTask memTableFlushTask =
+        new MemTableFlushTask(memTable, writer, database, dataRegionId);
+    memTableFlushTask.syncFlushMemTable();
 
-    // Write the tablets to the tsfile device by device, and the tablets
-    // in the same device are written in order of start time. Tablets in
-    // the same device should not be written if their time ranges overlap.
-    // If overlapped, we try to write the tablets whose device id is not
-    // the same as the previous one. For the tablets not written in the
-    // previous round, we write them in a new tsfile.
-    final List<Pair<String, File>> sealedFiles = new ArrayList<>();
+    writer.endFile();
+  }
 
-    // Try making the tsfile size as large as possible
-    while (!device2TabletsLinkedList.isEmpty()) {
-      if (Objects.isNull(fileWriter)) {
-        createFileWriter();
+  private void split(
+      final InsertTabletNode insertTabletNode,
+      int loc,
+      final int endOffset,
+      final Map<Long, List<int[]>[]> splitInfo) {
+    // before is first start point
+    int before = loc;
+    final long beforeTime = insertTabletNode.getTimes()[before];
+    // before time partition
+    long beforeTimePartition = 
TimePartitionUtils.getTimePartitionId(beforeTime);
+
+    // if is sequence
+    // TODO: always un-sequence now
+    final boolean isSequence = false;
+    while (loc < endOffset) {
+      final long time = insertTabletNode.getTimes()[loc];
+      final long timePartitionId = TimePartitionUtils.getTimePartitionId(time);
+
+      // judge if we should insert sequence
+      if (timePartitionId != beforeTimePartition) {
+        updateSplitInfo(splitInfo, beforeTimePartition, isSequence, new int[] 
{before, loc});
+        before = loc;
+        beforeTimePartition = timePartitionId;

Review Comment:
   We don't need to distinguish between time partitions and whether the 
Memtable is sequential. We just need to write all the data of each tablet to 
the Memtable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to