fanhualta commented on a change in pull request #372: 
[IOTDB-198]Reimplementation sync module
URL: https://github.com/apache/incubator-iotdb/pull/372#discussion_r326490358
 
 

 ##########
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 ##########
 @@ -950,11 +960,272 @@ protected void mergeEndAction(List<TsFileResource> 
seqFiles, List<TsFileResource
     logger.info("{} a merge task ends", storageGroupName);
   }
 
+  /**
+   * Load a new tsfile to storage group processor
+   *
+   * Firstly, determine the loading type of the file, whether it needs to be 
loaded in sequence list
+   * or unsequence list.
+   *
+   * Secondly, execute the loading process by the type.
+   *
+   * Finally, update the latestTimeForEachDevice and 
latestFlushedTimeForEachDevice.
+   *
+   * @param newTsFile new tsfile
+   * @param newTsFileResource tsfile resource
+   * @UsedBy sync module.
+   */
+  public void loadNewTsFile(File newTsFile, TsFileResource newTsFileResource)
+      throws TsFileProcessorException {
+    writeLock();
+    mergeLock.writeLock().lock();
+    try {
+      boolean isOverlap = false;
+      int preIndex = -1, subsequentIndex = sequenceFileList.size();
+      // check new tsfile
+      outer:
+      for (int i = 0; i < sequenceFileList.size(); i++) {
+        if 
(sequenceFileList.get(i).getFile().getName().equals(newTsFile.getName())) {
+          return;
+        }
+        if (i == sequenceFileList.size() - 1 && 
sequenceFileList.get(i).getEndTimeMap().isEmpty()) {
+          continue;
+        }
+        int preCnt = 0, subsequenceCnt = 0;
+        for (String device : newTsFileResource.getStartTimeMap().keySet()) {
+          if (sequenceFileList.get(i).getStartTimeMap().containsKey(device)) {
+            long startTime1 = 
sequenceFileList.get(i).getStartTimeMap().get(device);
+            long endTime1 = 
sequenceFileList.get(i).getEndTimeMap().get(device);
+            long startTime2 = newTsFileResource.getStartTimeMap().get(device);
+            long endTime2 = newTsFileResource.getEndTimeMap().get(device);
+            if (startTime1 > endTime2) {
+              subsequenceCnt++;
+            } else if (startTime2 > endTime1) {
+              preCnt++;
+            } else {
+              isOverlap = true;
+              break outer;
+            }
+          }
+        }
+        if (preCnt != 0 && subsequenceCnt != 0) {
+          isOverlap = true;
+          break;
+        }
+        if (preCnt == 0 && subsequenceCnt != 0) {
+          subsequentIndex = i;
+          break;
+        }
+        if (preCnt != 0 && subsequenceCnt == 0) {
+          preIndex = i;
+        }
+      }
+
+      // loading tsfile by type
+      if (isOverlap) {
+        loadTsFileByType(-1, newTsFile, newTsFileResource, 
unSequenceFileList.size());
+      } else {
+        if (subsequentIndex != sequenceFileList.size()) {
+          loadTsFileByType(1, newTsFile, newTsFileResource, subsequentIndex);
+        } else {
+          if (preIndex != -1) {
+            loadTsFileByType(1, newTsFile, newTsFileResource, preIndex + 1);
+          } else {
+            loadTsFileByType(1, newTsFile, newTsFileResource, 
sequenceFileList.size());
+          }
+        }
+      }
+
+      // update latest time map
+      updateLatestTimeMap(newTsFileResource);
+    } catch (TsFileProcessorException | DiskSpaceInsufficientException e) {
+      logger.error("Failed to append the tsfile {} to storage group processor 
{}.",
+          newTsFile.getAbsolutePath(), newTsFile.getParentFile().getName());
+      throw new TsFileProcessorException(e);
+    } finally {
+      mergeLock.writeLock().unlock();
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Update latest time in latestTimeForEachDevice and 
latestFlushedTimeForEachDevice.
+   *
+   * @UsedBy sync module
+   */
+  private void updateLatestTimeMap(TsFileResource newTsFileResource) {
+    for (Entry<String, Long> entry : 
newTsFileResource.getEndTimeMap().entrySet()) {
+      String device = entry.getKey();
+      long endTime = newTsFileResource.getEndTimeMap().get(device);
+      if (!latestTimeForEachDevice.containsKey(device)
+          || latestTimeForEachDevice.get(device) < endTime) {
+        latestTimeForEachDevice.put(device, endTime);
+      }
+      if (!latestFlushedTimeForEachDevice.containsKey(device)
+          || latestFlushedTimeForEachDevice.get(device) < endTime) {
+        latestFlushedTimeForEachDevice.put(device, endTime);
+      }
+    }
+  }
+
+  /**
+   * Execute the loading process by the type.
+   *
+   * @param type load type: 1 sequence tsfile ; 2 unsequence tsfile
+   * @param tsFile tsfile to be loaded
+   * @param tsFileResource tsfile resource to be loaded
+   * @param index the index in sequenceFileList/unSequenceFileList
+   * @UsedBy sync module
+   */
+  private void loadTsFileByType(int type, File tsFile, TsFileResource 
tsFileResource, int index)
+      throws TsFileProcessorException, DiskSpaceInsufficientException {
+    File targetFile;
+    if (type == -1) {
+      targetFile =
+          new 
File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+              tsFile.getParentFile().getName() + File.separatorChar + 
tsFile.getName());
+      tsFileResource.setFile(targetFile);
+      unSequenceFileList.add(index, tsFileResource);
+      logger
+          .info("Load tsfile in unsequence list, move file from {} to {}", 
tsFile.getAbsolutePath(),
+              targetFile.getAbsolutePath());
+    } else {
+      targetFile =
+          new 
File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+              tsFile.getParentFile().getName() + File.separatorChar + 
getFileNameForLoadingFile(
+                  tsFile.getName(), index));
+      tsFileResource.setFile(targetFile);
+      sequenceFileList.add(index, tsFileResource);
+      logger.info("Load tsfile in sequence list, move file from {} to {}", 
tsFile.getAbsolutePath(),
+          targetFile.getAbsolutePath());
+    }
+
+    // move file from sync dir to data dir
+    if (!targetFile.getParentFile().exists()) {
+      targetFile.getParentFile().mkdirs();
+    }
+    if (tsFile.exists() && !targetFile.exists()) {
+      try {
+        FileUtils.moveFile(tsFile, targetFile);
+      } catch (IOException e) {
+        throw new TsFileProcessorException(String.format(
+            "File renaming failed when loading tsfile. Origin: %s, Target: %s",
+            tsFile.getAbsolutePath(), targetFile.getAbsolutePath()));
+      }
+    }
+    if (new File(tsFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+        targetFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX).exists()) {
 
 Review comment:
   Actually, it's redundant. It has been removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to