qiaojialin commented on a change in pull request #258: [IOTDB-143]Development 
of merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313852348
 
 

 ##########
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java
 ##########
 @@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.selector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.utils.MergeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MaxFileMergeFileSelector selects the most files from given seqFiles and 
unseqFiles which can be
+ * merged without exceeding given memory budget. It always assume the number 
of timeseries being
+ * queried at the same time is 1 to maximize the number of file merged.
+ */
+public class MaxFileMergeFileSelector implements MergeFileSelector {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MaxFileMergeFileSelector.class);
+  private static final String LOG_FILE_COST = "Memory cost of file {} is {}";
+
+  MergeResource resource;
+
+  long totalCost;
+  private long memoryBudget;
+  private long maxSeqFileCost;
+
+  // the number of timeseries being queried at the same time
+  int concurrentMergeNum = 1;
+
+  /**
+   * Total metadata size of each file.
+   */
+  private Map<TsFileResource, Long> fileMetaSizeMap = new HashMap<>();
+  /**
+   * Maximum memory cost of querying a timeseries in each file.
+   */
+  private Map<TsFileResource, Long> maxSeriesQueryCostMap = new HashMap<>();
+
+  List<TsFileResource> selectedUnseqFiles;
+  List<TsFileResource> selectedSeqFiles;
+
+  private Collection<Integer> tmpSelectedSeqFiles;
+  private long tempMaxSeqFileCost;
+
+  private boolean[] seqSelected;
+  private int seqSelectedNum;
+
+  public MaxFileMergeFileSelector(MergeResource resource, long memoryBudget) {
+    this.resource = resource;
+    this.memoryBudget = memoryBudget;
+  }
+
+  /**
+   * Select merge candidates from seqFiles and unseqFiles under the given 
memoryBudget.
+   * This process iteratively adds the next unseqFile from unseqFiles and its 
overlapping seqFiles
+   * as newly-added candidates and computes their estimated memory cost. If 
the current cost
+   * pluses the new cost is still under the budget, accept the unseqFile and 
the seqFiles as
+   * candidates, otherwise go to the next iteration.
+   * The memory cost of a file is calculated in two ways:
+   *    The rough estimation: for a seqFile, the size of its metadata is used 
for estimation.
+   *    Since in the worst case, the file only contains one timeseries and all 
its metadata will
+   *    be loaded into memory with at most one actual data page (which is 
negligible) and writing
+   *    the timeseries into a new file generate metadata of the similar size, 
so the size of all
+   *    seqFiles' metadata (generated when writing new chunks) pluses the 
largest one (loaded
+   *    when reading a timeseries from the seqFiles) is the total estimation 
of all seqFiles; for
+   *    an unseqFile, since the merge reader may read all chunks of a series 
to perform a merge
+   *    read, the whole file may be loaded into memory, so we use the file's 
length as the
+   *    maximum estimation.
+   *    The tight estimation: based on the rough estimation, we scan the 
file's metadata to
+   *    count the number of chunks for each series, find the series which have 
the most
+   *    chunks in the file and use its chunk proportion to refine the rough 
estimation.
+   * The rough estimation is performed first, if no candidates can be found 
using rough
+   * estimation, we run the selection again with tight estimation.
+   * @return two lists of TsFileResource, the former is selected seqFiles and 
the latter is
+   * selected unseqFiles or an empty array if there are no proper candidates 
by the budget.
+   * @throws MergeException
+   */
+  @Override
+  public List[] select() throws MergeException {
+    long startTime = System.currentTimeMillis();
+    try {
+      logger.info("Selecting merge candidates from {} seqFile, {} unseqFiles",
+          resource.getSeqFiles().size(), resource.getUnseqFiles().size());
+      select(false);
+      if (selectedUnseqFiles.isEmpty()) {
+        select(true);
+      }
+      resource.setSeqFiles(selectedSeqFiles);
+      resource.setUnseqFiles(selectedUnseqFiles);
+      resource.removeOutdatedSeqReaders();
+      if (selectedUnseqFiles.isEmpty()) {
+        logger.info("No merge candidates are found");
+        return new List[0];
+      }
+    } catch (IOException e) {
+      throw new MergeException(e);
+    }
+    if (logger.isInfoEnabled()) {
+      logger.info("Selected merge candidates, {} seqFiles, {} unseqFiles, 
total memory cost {}, "
+              + "time consumption {}ms",
+          selectedSeqFiles.size(), selectedUnseqFiles.size(), totalCost,
+          System.currentTimeMillis() - startTime);
+    }
+    return new List[]{selectedSeqFiles, selectedUnseqFiles};
+  }
+
+  void select(boolean useTightBound) throws IOException {
+    tmpSelectedSeqFiles = new HashSet<>();
+    seqSelected = new boolean[resource.getSeqFiles().size()];
+    seqSelectedNum = 0;
+    selectedSeqFiles = new ArrayList<>();
+    selectedUnseqFiles = new ArrayList<>();
+    maxSeqFileCost = 0;
+    tempMaxSeqFileCost = 0;
+
+    totalCost = 0;
+
+    int unseqIndex = 0;
+    long startTime = System.currentTimeMillis();
+    long timeConsumption = 0;
+    long timeLimit = 
IoTDBDescriptor.getInstance().getConfig().getMergeFileSelectionTimeBudget();
+    if (timeLimit < 0) {
+      timeLimit = Long.MAX_VALUE;
+    }
+    while (unseqIndex < resource.getUnseqFiles().size() && timeConsumption < 
timeLimit) {
+      // select next unseq files
+      TsFileResource unseqFile = resource.getUnseqFiles().get(unseqIndex);
+
+      selectOverlappedSeqFiles(unseqFile);
+
+      tempMaxSeqFileCost = maxSeqFileCost;
+      long newCost = useTightBound ? calculateTightMemoryCost(unseqFile, 
tmpSelectedSeqFiles,
+          startTime, timeLimit) :
+          calculateLooseMemoryCost(unseqFile, tmpSelectedSeqFiles, startTime, 
timeLimit);
+
+      if (totalCost + newCost < memoryBudget) {
+        selectedUnseqFiles.add(unseqFile);
+        maxSeqFileCost = tempMaxSeqFileCost;
+
+        for (Integer seqIdx : tmpSelectedSeqFiles) {
+          seqSelected[seqIdx] = true;
+          seqSelectedNum++;
+          selectedSeqFiles.add(resource.getSeqFiles().get(seqIdx));
+        }
+        totalCost += newCost;
+        logger.debug("Adding a new unseqFile {} and seqFiles {} as candidates, 
new cost {}, total"
+                + " cost {}",
+            unseqFile, tmpSelectedSeqFiles, newCost, totalCost);
+      }
+      tmpSelectedSeqFiles.clear();
+      unseqIndex++;
+      timeConsumption = System.currentTimeMillis() - startTime;
+    }
+  }
+
+  private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
+    if (seqSelectedNum == resource.getSeqFiles().size()) {
+      return;
+    }
+    int tmpSelectedNum = 0;
+    for (Entry<String, Long> deviceStartTimeEntry : 
unseqFile.getStartTimeMap().entrySet()) {
+      String deviceId = deviceStartTimeEntry.getKey();
+      Long unseqStartTime = deviceStartTimeEntry.getValue();
+      Long unseqEndTime = unseqFile.getEndTimeMap().get(deviceId);
+
+      boolean noMoreOverlap = false;
+      for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; 
i++) {
+        TsFileResource seqFile = resource.getSeqFiles().get(i);
+        if (seqSelected[i] || !seqFile.getEndTimeMap().containsKey(deviceId)) {
+          continue;
+        }
+        Long seqEndTime = seqFile.getEndTimeMap().get(deviceId);
+        if (unseqEndTime <= seqEndTime) {
+          // the unseqFile overlaps current seqFile
+          tmpSelectedSeqFiles.add(i);
+          tmpSelectedNum ++;
+          // the device of the unseqFile can not merge with later seqFiles
+          noMoreOverlap = true;
+        } else if (unseqStartTime <= seqEndTime) {
+          // the device of the unseqFile may merge with later seqFiles
+          // and the unseqFile overlaps current seqFile
+          tmpSelectedSeqFiles.add(i);
+          tmpSelectedNum++;
+        }
+      }
+      if (tmpSelectedNum + seqSelectedNum == resource.getSeqFiles().size()) {
+        break;
+      }
+    }
+  }
+
+  private long calculateMemoryCost(TsFileResource tmpSelectedUnseqFile,
+      Collection<Integer> tmpSelectedSeqFiles, FileQueryMemMeasurement 
unseqMeasurement,
+      FileQueryMemMeasurement seqMeasurement, long startTime, long timeLimit) 
throws IOException {
+    long cost = 0;
+    Long fileCost = unseqMeasurement.measure(tmpSelectedUnseqFile);
+    cost += fileCost;
+
+    for (Integer seqFileIdx : tmpSelectedSeqFiles) {
+      TsFileResource seqFile = resource.getSeqFiles().get(seqFileIdx);
+      fileCost = seqMeasurement.measure(seqFile);
+      if (fileCost > tempMaxSeqFileCost) {
+        // only one file will be read at the same time, so only the largest 
one is recorded here
+        cost -= tempMaxSeqFileCost;
+        cost += fileCost;
+        tempMaxSeqFileCost = fileCost;
+      }
+      // but writing data into a new file may generate the same amount of 
metadata in memory
+      cost += calculateMetadataSize(seqFile);
+      long timeConsumption = System.currentTimeMillis() - startTime;
+      if (timeConsumption > timeLimit) {
+        return Long.MAX_VALUE;
+      }
+    }
+    return cost;
+  }
+
+  private long calculateLooseMemoryCost(TsFileResource tmpSelectedUnseqFile,
+      Collection<Integer> tmpSelectedSeqFiles, long startTime, long timeLimit) 
throws IOException {
+    return calculateMemoryCost(tmpSelectedUnseqFile, tmpSelectedSeqFiles,
+        TsFileResource::getFileSize, this::calculateMetadataSize, startTime, 
timeLimit);
+  }
+
+  private long calculateTightMemoryCost(TsFileResource tmpSelectedUnseqFile,
+      Collection<Integer> tmpSelectedSeqFiles, long startTime, long timeLimit) 
throws IOException {
+    return calculateMemoryCost(tmpSelectedUnseqFile, tmpSelectedSeqFiles,
+        this::calculateTightUnseqMemoryCost, 
this::calculateTightSeqMemoryCost, startTime, timeLimit);
+  }
+
+  private long calculateMetadataSize(TsFileResource seqFile) throws 
IOException {
+    Long cost = fileMetaSizeMap.get(seqFile);
+    if (cost == null) {
+      cost = MergeUtils.getFileMetaSize(seqFile, 
resource.getFileReader(seqFile));
+      fileMetaSizeMap.put(seqFile, cost);
+      logger.debug(LOG_FILE_COST, seqFile, cost);
+    }
+    return cost;
+  }
+
+  private long calculateTightFileMemoryCost(TsFileResource seqFile, 
FileQueryMemMeasurement measurement)
 
 Review comment:
   add javadoc

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to