zhanglingzhe0820 commented on a change in pull request #1758:
URL: https://github.com/apache/iotdb/pull/1758#discussion_r508458674



##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
##########
@@ -407,107 +430,141 @@ public void recover() {
   }
 
   @Override
-  public void forkCurrentFileList(long timePartition) throws IOException {
-    Pair<Long, Map<Path, MeasurementSchema>> seqResult = forkTsFileList(
+  public void forkCurrentFileList(long timePartition) {
+    forkTsFileList(
         forkedSequenceTsFileResources,
-        sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources));
-    forkedSeqListPointNum = seqResult.left;
-    forkedSeqListPathMeasurementSchemaMap = seqResult.right;
-    Pair<Long, Map<Path, MeasurementSchema>> unSeqResult = forkTsFileList(
+        sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources),
+        maxLevelNum);
+    forkTsFileList(
         forkedUnSequenceTsFileResources,
         unSequenceTsFileResources
-            .computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources));
-    forkedUnSeqListPointNum = unSeqResult.left;
-    forkedUnSeqListPathMeasurementSchemaMap = unSeqResult.right;
+            .computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources), maxUnseqLevelNum);
   }
 
-  private Pair<Long, Map<Path, MeasurementSchema>> forkTsFileList(
+//  private Pair<Double, Double> forkTsFileList(
+//      List<List<TsFileResource>> forkedTsFileResources,
+//      List rawTsFileResources, int currMaxLevel) {
+//    forkedTsFileResources.clear();
+//    // just fork part of the TsFile list, controlled by max_merge_chunk_point
+//    long pointNum = 0;
+//    // all flush to target file
+//    ICardinality measurementSet = new HyperLogLog(13);
+//    for (int i = 0; i < currMaxLevel - 1; i++) {
+//      List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
+//      Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
+//          .get(i);
+//      synchronized (levelRawTsFileResources) {
+//        for (TsFileResource tsFileResource : levelRawTsFileResources) {
+//          if (tsFileResource.isClosed()) {
+//            String path = tsFileResource.getTsFile().getAbsolutePath();
+//            try {
+//              if (tsFileResource.getTsFile().exists()) {
+//                TsFileSequenceReader reader = new TsFileSequenceReader(path);
+//                List<Path> pathList = reader.getAllPaths();
+//                for (Path sensorPath : pathList) {
+//                  measurementSet.offer(sensorPath.getFullPath());
+//                  List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(sensorPath);
+//                  for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+//                    pointNum += chunkMetadata.getNumOfPoints();
+//                  }
+//                }
+//              } else {
+//                logger.info("{} tsfile does not exist", path);
+//              }
+//            } catch (IOException e) {
+//              logger.error(
+//                  "{} tsfile reader creates error", path, e);
+//            }
+//          }
+//          if (measurementSet.cardinality() > 0
+//              && pointNum / measurementSet.cardinality() >= 
maxChunkPointNum) {
+//            forkedLevelTsFileResources.add(tsFileResource);
+//            break;
+//          }
+//          forkedLevelTsFileResources.add(tsFileResource);
+//        }
+//      }
+//
+//      if (measurementSet.cardinality() > 0
+//          && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
+//        forkedTsFileResources.add(forkedLevelTsFileResources);
+//        break;
+//      }
+//      forkedTsFileResources.add(forkedLevelTsFileResources);
+//    }
+//
+//    // fill in empty file
+//    while (forkedTsFileResources.size() < currMaxLevel) {
+//      List<TsFileResource> emptyForkedLevelTsFileResources = new 
ArrayList<>();
+//      forkedTsFileResources.add(emptyForkedLevelTsFileResources);
+//    }
+//
+//    return new Pair<>((double) pointNum, (double) 
measurementSet.cardinality());
+//  }

Review comment:
       use it now

##########
File path: 
server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
##########
@@ -407,107 +430,141 @@ public void recover() {
   }
 
   @Override
-  public void forkCurrentFileList(long timePartition) throws IOException {
-    Pair<Long, Map<Path, MeasurementSchema>> seqResult = forkTsFileList(
+  public void forkCurrentFileList(long timePartition) {
+    forkTsFileList(
         forkedSequenceTsFileResources,
-        sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources));
-    forkedSeqListPointNum = seqResult.left;
-    forkedSeqListPathMeasurementSchemaMap = seqResult.right;
-    Pair<Long, Map<Path, MeasurementSchema>> unSeqResult = forkTsFileList(
+        sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources),
+        maxLevelNum);
+    forkTsFileList(
         forkedUnSequenceTsFileResources,
         unSequenceTsFileResources
-            .computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources));
-    forkedUnSeqListPointNum = unSeqResult.left;
-    forkedUnSeqListPathMeasurementSchemaMap = unSeqResult.right;
+            .computeIfAbsent(timePartition, 
this::newUnSequenceTsFileResources), maxUnseqLevelNum);
   }
 
-  private Pair<Long, Map<Path, MeasurementSchema>> forkTsFileList(
+//  private Pair<Double, Double> forkTsFileList(
+//      List<List<TsFileResource>> forkedTsFileResources,
+//      List rawTsFileResources, int currMaxLevel) {
+//    forkedTsFileResources.clear();
+//    // just fork part of the TsFile list, controlled by max_merge_chunk_point
+//    long pointNum = 0;
+//    // all flush to target file
+//    ICardinality measurementSet = new HyperLogLog(13);
+//    for (int i = 0; i < currMaxLevel - 1; i++) {
+//      List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
+//      Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
+//          .get(i);
+//      synchronized (levelRawTsFileResources) {
+//        for (TsFileResource tsFileResource : levelRawTsFileResources) {
+//          if (tsFileResource.isClosed()) {
+//            String path = tsFileResource.getTsFile().getAbsolutePath();
+//            try {
+//              if (tsFileResource.getTsFile().exists()) {
+//                TsFileSequenceReader reader = new TsFileSequenceReader(path);
+//                List<Path> pathList = reader.getAllPaths();
+//                for (Path sensorPath : pathList) {
+//                  measurementSet.offer(sensorPath.getFullPath());
+//                  List<ChunkMetadata> chunkMetadataList = 
reader.getChunkMetadataList(sensorPath);
+//                  for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+//                    pointNum += chunkMetadata.getNumOfPoints();
+//                  }
+//                }
+//              } else {
+//                logger.info("{} tsfile does not exist", path);
+//              }
+//            } catch (IOException e) {
+//              logger.error(
+//                  "{} tsfile reader creates error", path, e);
+//            }
+//          }
+//          if (measurementSet.cardinality() > 0
+//              && pointNum / measurementSet.cardinality() >= 
maxChunkPointNum) {
+//            forkedLevelTsFileResources.add(tsFileResource);
+//            break;
+//          }
+//          forkedLevelTsFileResources.add(tsFileResource);
+//        }
+//      }
+//
+//      if (measurementSet.cardinality() > 0
+//          && pointNum / measurementSet.cardinality() >= maxChunkPointNum) {
+//        forkedTsFileResources.add(forkedLevelTsFileResources);
+//        break;
+//      }
+//      forkedTsFileResources.add(forkedLevelTsFileResources);
+//    }
+//
+//    // fill in empty file
+//    while (forkedTsFileResources.size() < currMaxLevel) {
+//      List<TsFileResource> emptyForkedLevelTsFileResources = new 
ArrayList<>();
+//      forkedTsFileResources.add(emptyForkedLevelTsFileResources);
+//    }
+//
+//    return new Pair<>((double) pointNum, (double) 
measurementSet.cardinality());
+//  }
+
+  private void forkTsFileList(
       List<List<TsFileResource>> forkedTsFileResources,
-      List rawTsFileResources) throws IOException {
+      List rawTsFileResources, int currMaxLevel) {
     forkedTsFileResources.clear();
-    // just fork part of the TsFile list, controlled by max_merge_chunk_point
-    long pointNum = 0;
-    // all flush to target file
-    Map<Path, MeasurementSchema> pathMeasurementSchemaMap = new HashMap<>();
-    for (int i = 0; i < maxLevelNum - 1; i++) {
+    for (int i = 0; i < currMaxLevel - 1; i++) {
       List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
       Collection<TsFileResource> levelRawTsFileResources = 
(Collection<TsFileResource>) rawTsFileResources
           .get(i);
-      for (TsFileResource tsFileResource : levelRawTsFileResources) {
-        if (tsFileResource.isClosed()) {
-          RestorableTsFileIOWriter writer;
-          try {
-            writer = new RestorableTsFileIOWriter(
-                tsFileResource.getTsFile());
-          } catch (Exception e) {
-            logger.error("[Hot Compaction] {} open writer failed",
-                tsFileResource.getTsFile().getPath(), e);
-            continue;
+      synchronized (levelRawTsFileResources) {
+        for (TsFileResource tsFileResource : levelRawTsFileResources) {
+          if (tsFileResource.isClosed()) {
+            forkedLevelTsFileResources.add(tsFileResource);
           }
-          Map<String, Map<String, List<ChunkMetadata>>> schemaMap = writer
-              .getMetadatasForQuery();
-          for (Entry<String, Map<String, List<ChunkMetadata>>> schemaMapEntry 
: schemaMap
-              .entrySet()) {
-            String device = schemaMapEntry.getKey();
-            for (Entry<String, List<ChunkMetadata>> entry : 
schemaMapEntry.getValue()
-                .entrySet()) {
-              String measurement = entry.getKey();
-              List<ChunkMetadata> chunkMetadataList = entry.getValue();
-              for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-                pointNum += chunkMetadata.getNumOfPoints();
-              }
-              pathMeasurementSchemaMap.computeIfAbsent(new Path(device, 
measurement), k ->
-                  new MeasurementSchema(measurement, 
chunkMetadataList.get(0).getDataType()));
-            }
-          }
-          writer.close();
-          forkedLevelTsFileResources.add(tsFileResource);
-        }
-        if (pathMeasurementSchemaMap.size() > 0
-            && pointNum / pathMeasurementSchemaMap.size() >= maxChunkPointNum) 
{
-          break;
         }
       }
-      if (pathMeasurementSchemaMap.size() > 0
-          && pointNum / pathMeasurementSchemaMap.size() >= maxChunkPointNum) {
-        break;
-      }
       forkedTsFileResources.add(forkedLevelTsFileResources);
     }
-    return new Pair<>(pointNum, pathMeasurementSchemaMap);
+
+    // fill in empty file
+    while (forkedTsFileResources.size() < currMaxLevel) {
+      List<TsFileResource> emptyForkedLevelTsFileResources = new ArrayList<>();
+      forkedTsFileResources.add(emptyForkedLevelTsFileResources);
+    }
   }
 
   @Override
   protected void merge(long timePartition) {
-    merge(forkedSequenceTsFileResources, true, timePartition);
-    merge(forkedUnSequenceTsFileResources, false, timePartition);
+    merge(forkedSequenceTsFileResources, true, timePartition, maxLevelNum, 
maxFileNumInEachLevel);
+    if (maxUnseqLevelNum <= 1) {
+      merge(isForceFullMerge, getTsFileList(true), 
forkedUnSequenceTsFileResources.get(0),
+          Long.MAX_VALUE);
+    } else {
+      merge(forkedUnSequenceTsFileResources, false, timePartition, 
maxUnseqLevelNum,
+          maxUnseqFileNumInEachLevel);
+    }
   }
 
   @SuppressWarnings("squid:S3776")
   private void merge(List<List<TsFileResource>> mergeResources, boolean 
sequence,
-      long timePartition) {
+      long timePartition, int currMaxLevel, int currMaxFileNumInEachLevel) {
     long startTimeMillis = System.currentTimeMillis();
     try {
       logger.info("{} start to filter hot compaction condition", 
storageGroupName);
-      long pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
-      Map<Path, MeasurementSchema> pathMeasurementSchemaMap =
-          sequence ? forkedSeqListPathMeasurementSchemaMap
-              : forkedUnSeqListPathMeasurementSchemaMap;
-      logger.info("{} current sg subLevel point num: {}, measurement num: {}", 
storageGroupName,
-          pointNum, pathMeasurementSchemaMap.size());
+//      double pointNum = sequence ? forkedSeqListPointNum : 
forkedUnSeqListPointNum;
+//      double measurementSize =
+//          sequence ? forkedSeqListMeasurementSize : 
forkedUnSeqListMeasurementSize;
+//      logger
+//          .info("{} current sg subLevel point num: {}, approximate 
measurement num: {}",
+//              storageGroupName, pointNum,
+//              measurementSize);

Review comment:
       use it nowuse it now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to