shuwenwei commented on code in PR #13097:
URL: https://github.com/apache/iotdb/pull/13097#discussion_r1724183633


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java:
##########
@@ -36,43 +37,77 @@
 
 public abstract class AbstractInnerCompactionWriter extends 
AbstractCompactionWriter {
   protected CompactionTsFileWriter fileWriter;
+  protected List<TsFileResource> targetResources;
+  protected int currentFileIndex;
+  protected long endedFileSize = 0;
 
   protected boolean isEmptyFile;
 
-  protected TsFileResource targetResource;
+  protected final long sizeForFileWriter =
+      (long)
+          ((double) SystemInfo.getInstance().getMemorySizeForCompaction()
+              / 
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
+              * 
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
 
-  protected AbstractInnerCompactionWriter(TsFileResource targetFileResource) 
throws IOException {
-    long sizeForFileWriter =
-        (long)
-            ((double) SystemInfo.getInstance().getMemorySizeForCompaction()
-                / 
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
-                * 
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
-    this.targetResource = targetFileResource;
-    this.fileWriter =
-        new CompactionTsFileWriter(
-            targetFileResource.getTsFile(),
-            sizeForFileWriter,
-            targetResource.isSeq()
-                ? CompactionType.INNER_SEQ_COMPACTION
-                : CompactionType.INNER_UNSEQ_COMPACTION);
-    isEmptyFile = true;
+  protected AbstractInnerCompactionWriter(TsFileResource targetFileResource) {
+    this(Collections.singletonList(targetFileResource));
   }
 
-  @Override
-  protected List<CompactionTsFileWriter> getAllTargetFileWriter() {
-    return Collections.singletonList(fileWriter);
+  protected AbstractInnerCompactionWriter(List<TsFileResource> 
targetFileResources) {
+    this.targetResources = targetFileResources;
+    isEmptyFile = true;
   }
 
   @Override
   public void startChunkGroup(IDeviceID deviceId, boolean isAlign) throws 
IOException {
+    fileWriter = getAvailableWriter();
     fileWriter.startChunkGroup(deviceId);
     this.isAlign = isAlign;
     this.deviceId = deviceId;
   }
 
+  private CompactionTsFileWriter getAvailableWriter() throws IOException {
+    if (fileWriter == null) {
+      useNewWriter();
+      return fileWriter;
+    }
+    boolean shouldSwitchToNextWriter =
+        fileWriter.getPos()
+                >= 
IoTDBDescriptor.getInstance().getConfig().getTargetCompactionFileSize()
+            && (currentFileIndex != targetResources.size() - 1);
+    if (shouldSwitchToNextWriter) {
+      rollCompactionFileWriter();
+    }
+    return fileWriter;
+  }
+
+  private void rollCompactionFileWriter() throws IOException {
+    fileWriter.endFile();
+    endedFileSize += fileWriter.getPos();
+    fileWriter.close();
+    if (fileWriter.isEmptyTargetFile()) {
+      targetResources.get(currentFileIndex).forceMarkDeleted();
+    }
+    fileWriter = null;
+
+    currentFileIndex++;
+    useNewWriter();
+  }
+
+  private void useNewWriter() throws IOException {
+    fileWriter =
+        new CompactionTsFileWriter(
+            targetResources.get(currentFileIndex).getTsFile(),
+            sizeForFileWriter,
+            targetResources.get(currentFileIndex).isSeq()
+                ? CompactionType.INNER_SEQ_COMPACTION
+                : CompactionType.INNER_UNSEQ_COMPACTION);
+    
fileWriter.setSchema(CompactionTableSchemaCollector.copySchema(schemas.get(0)));

Review Comment:
   There are some differences between using a new Schema object and copying the 
collected Schema object. The purpose of collecting before merging is to get the 
schema of all table models. At this time, only the id column is generated, and 
the measurement column of table schema will be modified when call 
CompactionTsFileWriter.endChunkGroup(). However, since the device may be 
divided into multiple files, some tables will not be written to one file in the 
end, and these TableSchema will be removed during endFile.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to