qiaojialin commented on a change in pull request #258: [IOTDB-143]Development 
of merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314209390
 
 

 ##########
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 ##########
 @@ -674,6 +739,136 @@ public void closeUnsealedTsFileProcessor(
     }
   }
 
+  public void merge(boolean fullMerge) {
+    writeLock();
+    try {
+      if (isMerging) {
+        if (logger.isInfoEnabled()) {
+          logger.info("{} Last merge is ongoing, currently consumed time: 
{}ms", storageGroupName,
+              (System.currentTimeMillis() - mergeStartTime));
+        }
+        return;
+      }
+      if (unSequenceFileList.isEmpty() || sequenceFileList.isEmpty()) {
+        logger.info("{} no files to be merged", storageGroupName);
+        return;
+      }
+
+      long budget = 
IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
+      MergeResource mergeResource = new MergeResource(sequenceFileList, 
unSequenceFileList);
+      MergeFileSelector fileSelector = getMergeFileSelector(budget, 
mergeResource);
+      try {
+        List[] mergeFiles = fileSelector.select();
+        if (mergeFiles.length == 0) {
+          logger.info("{} cannot select merge candidates under the budget {}", 
storageGroupName,
+              budget);
+          return;
+        }
+        // avoid pending tasks holds the metadata and streams
+        mergeResource.clear();
+        String taskName = storageGroupName + "-" + System.currentTimeMillis();
+        // do not cache metadata until true candidates are chosen, or too much 
metadata will be
+        // cached during selection
+        mergeResource.setCacheDeviceMeta(true);
+
+        MergeTask mergeTask = new MergeTask(mergeResource, 
storageGroupSysDir.getPath(),
+            this::mergeEndAction, taskName, fullMerge, 
fileSelector.getConcurrentMergeNum());
+        mergingModification = new ModificationFile(storageGroupSysDir + 
File.separator + MERGING_MODIFICAITON_FILE_NAME);
+        MergeManager.getINSTANCE().submitMainTask(mergeTask);
+        if (logger.isInfoEnabled()) {
+          logger.info("{} submits a merge task {}, merging {} seqFiles, {} 
unseqFiles",
+              storageGroupName, taskName, mergeFiles[0].size(), 
mergeFiles[1].size());
+        }
+        isMerging = true;
+        mergeStartTime = System.currentTimeMillis();
+
+      } catch (MergeException | IOException e) {
+        logger.error("{} cannot select file for merge", storageGroupName, e);
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  private MergeFileSelector getMergeFileSelector(long budget, MergeResource 
resource) {
+    MergeFileStrategy strategy = 
IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
+    switch (strategy) {
+      case MAX_FILE_NUM:
+        return new MaxFileMergeFileSelector(resource, budget);
+      case MAX_SERIES_NUM:
+        return new MaxSeriesMergeFileSelector(resource, budget);
+      default:
+        throw new UnsupportedOperationException("Unknown MergeFileStrategy " + 
strategy);
+    }
+  }
+
+  protected void mergeEndAction(List<TsFileResource> seqFiles, 
List<TsFileResource> unseqFiles,
 
 Review comment:
   add javadoc

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to