[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-16 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314606772
 
 

 ##
 File path: 
tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoader.java
 ##
 @@ -34,4 +34,8 @@
*/
   void close() throws IOException;
 
+  /**
+   * clear Chunk cache if used.
+   */
+  void clear();
 
 Review comment:
   Renamed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-16 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314606732
 
 

 ##
 File path: 
tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerier.java
 ##
 @@ -62,4 +62,9 @@
*/
   List convertSpace2TimePartition(List paths, long 
spacePartitionStartPos,
   long spacePartitionEndPos) throws IOException;
+
+  /**
+   * clear caches (if used) to release memory.
+   */
+  void clear();
 
 Review comment:
   Renamed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-16 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314606070
 
 

 ##
 File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
 ##
 @@ -28,11 +28,12 @@
 
   private ChunkHeader chunkHeader;
   private ByteBuffer chunkData;
-  private long deletedAt = -1;
+  private long deletedAt;
 
-  public Chunk(ChunkHeader header, ByteBuffer buffer) {
+  public Chunk(ChunkHeader header, ByteBuffer buffer, long deletedAt) {
 this.chunkHeader = header;
 this.chunkData = buffer;
+this.deletedAt = deletedAt;
 
 Review comment:
   Added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-16 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314605654
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java
 ##
 @@ -74,6 +74,30 @@ public Binary getBinary() {
 throw new UnsupportedOperationException("getBinary() is not supported for 
current sub-class");
   }
 
+  public void setBoolean(boolean val) {
+throw new UnsupportedOperationException("getBoolean() is not supported for 
current sub-class");
+  }
+
+  public void setInt(int val) {
+throw new UnsupportedOperationException("getInt() is not supported for 
current sub-class");
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-16 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314605585
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java
 ##
 @@ -74,6 +74,30 @@ public Binary getBinary() {
 throw new UnsupportedOperationException("getBinary() is not supported for 
current sub-class");
   }
 
+  public void setBoolean(boolean val) {
+throw new UnsupportedOperationException("getBoolean() is not supported for 
current sub-class");
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-16 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314604701
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
 ##
 @@ -109,52 +109,6 @@ public long assignJobId() {
 return jobId;
   }
 
-  /**
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-16 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314601212
 
 

 ##
 File path: server/src/assembly/resources/conf/iotdb-engine.properties
 ##
 @@ -155,7 +155,52 @@ concurrent_flush_thread=0
 
 # whether take over the memory management by IoTDB rather than JVM when 
serializing memtable as bytes in memory
 # (i.e., whether use ChunkBufferPool), value true, false
-chunk_buffer_pool_enable = false
+chunk_buffer_pool_enable=false
+
+
+### Merge Configurations
+
+
+# How many thread will be set up to perform merge main tasks, 1 by default.
+# Set to 1 when less than or equal to 0.
+merge_thread_num=1
+
+# How many thread will be set up to perform merge chunk sub-tasks, 8 by 
default.
+# Set to 1 when less than or equal to 0.
+merge_chunk_subthread_num=8
+
+# If one merge file selection runs for more than this time, it will be ended 
and its current
+# selection will be used as final selection. Unit: millis.
+# When < 0, it means time is unbounded.
+merge_fileSelection_time_budget=3
+
+# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM 
memory by default.
+# This is only a rough estimation, starting from a relatively small value to 
avoid OOM.
+# Each new merge thread may take such memory, so merge_thread_num * 
merge_memory_budget is the
+# total memory estimation of merge.
+# merge_memory_budget=2147483648
+
+# When set to true, if some crashed merges are detected during system 
rebooting, such merges will
+# be continued, otherwise, the unfinished parts of such merges will not be 
continued while the
+# finished parts still remains as they are.
+# If you are feeling the rebooting is too slow, set this to false, false by 
default
+continue_merge_after_reboot=false
+
+# A global merge will be performed each such interval, that is, each storage 
group will be merged
+# (if proper merge candidates can be found). Unit: second, default: 1hours.
+# When less than or equal to 0, timed merge is disabled.
+merge_interval_sec=3600
+
+# When set to true, all merges becomes full merge (the whole SeqFiles are 
re-written despite how
+# much they are overflowed). This may increase merge overhead depending on how 
much the SeqFiles
+# are overflowed.
+force_full_merge=false
+
+# During a merge, if a chunk with less number of chunks than this parameter, 
the chunk will be
+# merged with its succeeding chunks even if it is not overflowed, until the 
merged chunks reach
+# this threshold and the new chunk will be flushed.
+# When less than 0, this mechanism is disabled.
+chunk_merge_point_threshold=4096
 
 Review comment:
   Any suggestion?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-16 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314601299
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetadataUtils.java
 ##
 @@ -95,6 +97,9 @@ public static TsDeviceMetadata getTsDeviceMetaData(String 
filePath, Path seriesP
 }
   }
 }
+for (List chunkMetaDataList : 
pathToChunkMetaDataList.values()) {
+  
chunkMetaDataList.sort(Comparator.comparingLong(ChunkMetaData::getStartTime));
+}
 
 Review comment:
   Yes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-16 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314601067
 
 

 ##
 File path: server/src/assembly/resources/conf/iotdb-engine.properties
 ##
 @@ -155,7 +155,52 @@ concurrent_flush_thread=0
 
 # whether take over the memory management by IoTDB rather than JVM when 
serializing memtable as bytes in memory
 # (i.e., whether use ChunkBufferPool), value true, false
-chunk_buffer_pool_enable = false
+chunk_buffer_pool_enable=false
+
+
+### Merge Configurations
+
+
+# How many thread will be set up to perform merge main tasks, 1 by default.
+# Set to 1 when less than or equal to 0.
+merge_thread_num=1
+
+# How many thread will be set up to perform merge chunk sub-tasks, 8 by 
default.
+# Set to 1 when less than or equal to 0.
+merge_chunk_subthread_num=8
+
+# If one merge file selection runs for more than this time, it will be ended 
and its current
+# selection will be used as final selection. Unit: millis.
+# When < 0, it means time is unbounded.
+merge_fileSelection_time_budget=3
+
+# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM 
memory by default.
+# This is only a rough estimation, starting from a relatively small value to 
avoid OOM.
+# Each new merge thread may take such memory, so merge_thread_num * 
merge_memory_budget is the
+# total memory estimation of merge.
+# merge_memory_budget=2147483648
+
+# When set to true, if some crashed merges are detected during system 
rebooting, such merges will
+# be continued, otherwise, the unfinished parts of such merges will not be 
continued while the
+# finished parts still remains as they are.
+# If you are feeling the rebooting is too slow, set this to false, false by 
default
+continue_merge_after_reboot=false
+
+# A global merge will be performed each such interval, that is, each storage 
group will be merged
+# (if proper merge candidates can be found). Unit: second, default: 1hours.
+# When less than or equal to 0, timed merge is disabled.
+merge_interval_sec=3600
+
+# When set to true, all merges becomes full merge (the whole SeqFiles are 
re-written despite how
+# much they are overflowed). This may increase merge overhead depending on how 
much the SeqFiles
+# are overflowed.
+force_full_merge=false
+
+# During a merge, if a chunk with less number of chunks than this parameter, 
the chunk will be
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-16 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314600996
 
 

 ##
 File path: client/src/main/java/org/apache/iotdb/cli/client/AbstractClient.java
 ##
 @@ -728,12 +728,10 @@ protected static void importCmd(String specialCmd, 
String cmd, IoTDBConnection c
 }
   }
 
-  protected static void executeQuery(IoTDBConnection connection, String cmd) {
-Statement statement = null;
+  private static void executeQuery(IoTDBConnection connection, String cmd) {
 long startTime = System.currentTimeMillis();
-try {
+try (Statement statement = connection.createStatement();) {
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314578681
 
 

 ##
 File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java
 ##
 @@ -185,7 +185,7 @@ public int hashCode() {
 
   @Override
   public boolean equals(Object obj) {
-return obj != null && obj instanceof Path && this.fullPath.equals(((Path) 
obj).fullPath);
+return obj instanceof Path && this.fullPath.equals(((Path) obj).fullPath);
 
 Review comment:
   instanceof always returns false for null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314578609
 
 

 ##
 File path: 
tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java
 ##
 @@ -108,6 +114,8 @@ public static TsFileMetaData deserializeFrom(InputStream 
inputStream) throws IOE
 if (ReadWriteIOUtils.readIsNull(inputStream)) {
   fileMetaData.createdBy = ReadWriteIOUtils.readString(inputStream);
 }
+fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(inputStream);
 
 Review comment:
   We shall discuss about compatibility somewhere else.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314578352
 
 

 ##
 File path: 
tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java
 ##
 @@ -151,6 +159,8 @@ public static TsFileMetaData deserializeFrom(ByteBuffer 
buffer) throws IOExcepti
 if (ReadWriteIOUtils.readIsNull(buffer)) {
   fileMetaData.createdBy = ReadWriteIOUtils.readString(buffer);
 }
+fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer);
 
 Review comment:
   I think keeping compatibility needs a much more delicate mechanism than 
this. I think it is  beyond this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314578160
 
 

 ##
 File path: 
tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
 ##
 @@ -135,9 +135,9 @@
*/
   public static int pageCheckSizeThreshold = 100;
   /**
-   * Default endian value is LITTLE_ENDIAN.
+   * Default endian value is BIG_ENDIAN.
 
 Review comment:
   put methods in ByteBuffer by default uses BIG_ENDIAN, so using BIG_ENDIAN as 
default may reduce a lot of meaning less lines.
   For the compatibility, no, but somehow you may implement BIG_ENDIAN(which we 
should have) in 0.8.0 and change the config.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314577395
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java
 ##
 @@ -64,17 +74,93 @@ public static TimeValuePair 
getCurrentTimeValuePair(AggreResultData data) {
   case INT32:
 return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsInt(data.getIntRet()));
   case INT64:
-return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsLong(data.getLongRet()));
+return new TimeValuePair(data.getTimestamp(),
+new TsPrimitiveType.TsLong(data.getLongRet()));
   case FLOAT:
-return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsFloat(data.getFloatRet()));
+return new TimeValuePair(data.getTimestamp(),
+new TsPrimitiveType.TsFloat(data.getFloatRet()));
   case DOUBLE:
-return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsDouble(data.getDoubleRet()));
+return new TimeValuePair(data.getTimestamp(),
+new TsPrimitiveType.TsDouble(data.getDoubleRet()));
   case TEXT:
-return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsBinary(data.getBinaryRet()));
+return new TimeValuePair(data.getTimestamp(),
+new TsPrimitiveType.TsBinary(data.getBinaryRet()));
   case BOOLEAN:
-return new TimeValuePair(data.getTimestamp(), new 
TsPrimitiveType.TsBoolean(data.isBooleanRet()));
+return new TimeValuePair(data.getTimestamp(),
+new TsPrimitiveType.TsBoolean(data.isBooleanRet()));
   default:
 throw new 
UnSupportedDataTypeException(String.valueOf(data.getDataType()));
 }
   }
-}
+
+  public static void setCurrentTimeValuePair(BatchData data, TimeValuePair 
current) {
+current.setTimestamp(data.currentTime());
+switch (data.getDataType()) {
+  case INT32:
+current.getValue().setInt(data.getInt());
+break;
+  case INT64:
+current.getValue().setLong(data.getLong());
+break;
+  case FLOAT:
+current.getValue().setFloat(data.getFloat());
+break;
+  case DOUBLE:
+current.getValue().setDouble(data.getDouble());
+break;
+  case TEXT:
+current.getValue().setBinary(data.getBinary());
+break;
+  case BOOLEAN:
+current.getValue().setBoolean(data.getBoolean());
+break;
+  default:
+throw new 
UnSupportedDataTypeException(String.valueOf(data.getDataType()));
+}
+  }
+
+  public static void setTimeValuePair(TimeValuePair from, TimeValuePair to) {
+to.setTimestamp(from.getTimestamp());
+switch (from.getValue().getDataType()) {
+  case INT32:
+to.getValue().setInt(from.getValue().getInt());
+break;
+  case INT64:
+to.getValue().setLong(from.getValue().getLong());
+break;
+  case FLOAT:
+to.getValue().setFloat(from.getValue().getFloat());
+break;
+  case DOUBLE:
+to.getValue().setDouble(from.getValue().getDouble());
+break;
+  case TEXT:
+to.getValue().setBinary(from.getValue().getBinary());
+break;
+  case BOOLEAN:
+to.getValue().setBoolean(from.getValue().getBoolean());
+break;
+  default:
+throw new 
UnSupportedDataTypeException(String.valueOf(from.getValue().getDataType()));
+}
+  }
+
+  public static TimeValuePair getEmptyTimeValuePair(TSDataType dataType) {
+switch (dataType) {
+  case FLOAT:
+return new TimeValuePair(0, new TsFloat(0.0f));
 
 Review comment:
   Where? This is not a method like Collections.emptyMap() at all.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314577226
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
 ##
 @@ -31,6 +31,7 @@
   AUTHORIZATION_SERVICE("Authorization ServerService", ""),
   FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
   SYNC_SERVICE("SYNC ServerService", ""),
+  MERGE_SERVICE("Merge Manager", ""),
 
 Review comment:
   I hope to leave it as a future work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314577062
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
 ##
 @@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.chunkRelated;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TimeValuePairUtils;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+
+public class CachedDiskChunkReader implements IPointReader {
 
 Review comment:
   I cannot. The next() in DiskChunkReader does not throw IOException while the 
one in CachedDiskChunkReader does.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314576748
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
 ##
 @@ -109,52 +109,6 @@ public long assignJobId() {
 return jobId;
   }
 
-  /**
-   * Begin query and set query tokens of queryPaths. This method is used for 
projection
-   * calculation.
-   */
-  public void beginQueryOfGivenQueryPaths(long jobId, List queryPaths)
 
 Review comment:
   Updated. Tokens were used to prevent a merge deleting files being queried, 
which are replaced by the locks in TsFileResource.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314574919
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
 ##
 @@ -34,7 +35,7 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * FileReaderManager is a singleton, which is used to manage
+ * resource.getSeqFiles()ager is a singleton, which is used to manage
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314574804
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 ##
 @@ -185,53 +204,95 @@ public StorageGroupProcessor(String systemInfoDir, 
String storageGroupName)
   private void recover() throws ProcessorException {
 logger.info("recover Storage Group  {}", storageGroupName);
 
-// collect TsFiles from sequential data directory
-List tsFiles = 
getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
-recoverSeqFiles(tsFiles);
-
-// collect TsFiles from unsequential data directory
-tsFiles = 
getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
-recoverUnseqFiles(tsFiles);
+try {
+  // collect TsFiles from sequential and unsequential data directory
+  List seqTsFiles = 
getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
+  List unseqTsFiles =
+  
getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+
+  recoverSeqFiles(seqTsFiles);
+  recoverUnseqFiles(unseqTsFiles);
+
+  String taskName = storageGroupName + "-" + System.currentTimeMillis();
+  File mergingMods = new File(storageGroupSysDir, 
MERGING_MODIFICAITON_FILE_NAME);
+  if (mergingMods.exists()) {
+mergingModification = new ModificationFile(storageGroupSysDir + 
File.separator + MERGING_MODIFICAITON_FILE_NAME);
+  }
+  RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, 
unseqTsFiles,
 
 Review comment:
   This process contains the repairs of merged files, it must be done before 
the database is online. You can perform the repair parts only by setting the 
config continue_merge_after_reboot to false, but it cannot be asynchronized.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314574478
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
 ##
 @@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
+import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeFileTask merges the merge temporary files with the seqFiles, either 
move the merged
+ * chunks in the temp files into the seqFiles or move the unmerged chunks into 
the merge temp
+ * files, depending on which one is the majority.
+ */
+class MergeFileTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeFileTask.class);
+
+  private String taskName;
+  private MergeContext context;
+  private MergeLogger mergeLogger;
+  private MergeResource resource;
+  private List unmergedFiles;
+
+  MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
+  MergeResource resource, List unmergedSeqFiles) {
+this.taskName = taskName;
+this.context = context;
+this.mergeLogger = mergeLogger;
+this.resource = resource;
+this.unmergedFiles = unmergedSeqFiles;
+  }
+
+  void mergeFiles() throws IOException {
+// decide whether to write the unmerged chunks to the merge files or to 
move the merged chunks
+// back to the origin seqFile's
+if (logger.isInfoEnabled()) {
+  logger.info("{} starts to merge {} files", taskName, 
unmergedFiles.size());
+}
+long startTime = System.currentTimeMillis();
+int cnt = 0;
+for (TsFileResource seqFile : unmergedFiles) {
+  int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 
0);
+  int unmergedChunkNum = 
context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
+  if (mergedChunkNum >= unmergedChunkNum) {
+// move the unmerged data to the new file
+if (logger.isInfoEnabled()) {
+  logger.info("{} moving unmerged data of {} to the merged file, {} 
merged chunks, {} "
+  + "unmerged chunks", taskName, seqFile.getFile().getName(), 
mergedChunkNum, unmergedChunkNum);
+}
+moveUnmergedToNew(seqFile);
+  } else {
+// move the merged data to the old file
+if (logger.isInfoEnabled()) {
+  logger.info("{} moving merged data of {} to the old file {} merged 
chunks, {} "
+  + "unmerged chunks", taskName, seqFile.getFile().getName(), 
mergedChunkNum, unmergedChunkNum);
+}
+moveMergedToOld(seqFile);
+  }
+  cnt ++;
+  if (logger.isInfoEnabled()) {
+logger.debug("{} has merged {}/{} files", taskName, cnt, 
unmergedFiles.size());
+  }
+}
+if (logger.isInfoEnabled()) {
+  logger.info("{} has merged all files after {}ms", taskName, 
System.currentTimeMillis() - startTime);
+}
+mergeLogger.logMergeEnd();
+  }
+
+  private void moveMergedToOld(TsFileResource seqFile) 

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314574372
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
 ##
 @@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import static org.apache.iotdb.db.utils.MergeUtils.writeBatchPoint;
+import static org.apache.iotdb.db.utils.MergeUtils.writeTVPair;
+import static org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.merge.selector.MergePathSelector;
+import org.apache.iotdb.db.engine.merge.selector.NaivePathSelector;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.db.utils.MergeUtils.MetaListEntry;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class MergeMultiChunkTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeMultiChunkTask.class);
+  private static int minChunkPointNum = 
IoTDBDescriptor.getInstance().getConfig()
+  .getChunkMergePointThreshold();
+
+  private MergeLogger mergeLogger;
+  private List unmergedSeries;
+
+  private String taskName;
+  private MergeResource resource;
+  private TimeValuePair[] currTimeValuePairs;
+  private boolean fullMerge;
+
+  private MergeContext mergeContext;
+
+  private AtomicInteger mergedChunkNum = new AtomicInteger();
+  private AtomicInteger unmergedChunkNum = new AtomicInteger();
+  private int mergedSeriesCnt;
+  private double progress;
+
+  private int concurrentMergeSeriesNum;
+  private List currMergingPaths = new ArrayList<>();
+
+  MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger 
mergeLogger,
+  MergeResource mergeResource, boolean fullMerge, List 
unmergedSeries,
+  int concurrentMergeSeriesNum) {
+this.mergeContext = context;
+this.taskName = taskName;
+this.mergeLogger = mergeLogger;
+this.resource = mergeResource;
+this.fullMerge = fullMerge;
+this.unmergedSeries = unmergedSeries;
+this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
+  }
+
+  void mergeSeries() throws IOException {
+if (logger.isInfoEnabled()) {
+  logger.info("{} starts to merge {} series", taskName, 
unmergedSeries.size());
+}
+long startTime = System.currentTimeMillis();
+for (TsFileResource seqFile : resource.getSeqFiles()) {
+  mergeContext.getUnmergedChunkStartTimes().put(seqFile, new HashMap<>());
+}
+// merge each series and write data into each seqFile's corresponding temp 
merge file
+List> devicePaths = 
MergeUtils.splitPathsByDevice(unmergedSeries);
 
 

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314574266
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java
 ##
 @@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.recover;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+/**
+ * MergeLogger records the progress of a merge in file "merge.log" as text 
lines.
+ */
+public class MergeLogger {
+
+  public static final String MERGE_LOG_NAME = "merge.log";
+
+  static final String STR_SEQ_FILES = "seqFiles";
+  static final String STR_UNSEQ_FILES = "unseqFiles";
+  static final String STR_TIMESERIES = "timeseries";
+  static final String STR_START = "start";
+  static final String STR_END = "end";
+  static final String STR_ALL_TS_END = "all ts end";
+  static final String STR_MERGE_START = "merge start";
+  static final String STR_MERGE_END = "merge end";
+
+  private BufferedWriter logStream;
+
+  public MergeLogger(String storageGroupDir) throws IOException {
+logStream = new BufferedWriter(new FileWriter(new File(storageGroupDir, 
MERGE_LOG_NAME), true));
+  }
+
+  public void close() throws IOException {
+logStream.close();
+  }
+
+  public void logTSStart(List paths) throws IOException {
+logStream.write(STR_START);
+for (Path path : paths) {
+  logStream.write(" " + path.getFullPath());
+}
+logStream.newLine();
+logStream.flush();
+  }
+
+  public void logFilePositionUpdate(File file) throws IOException {
+logStream.write(String.format("%s %d", file.getAbsolutePath(), 
file.length()));
+logStream.newLine();
+logStream.flush();
+  }
+
+  public void logTSEnd() throws IOException {
+logStream.write(STR_END);
+logStream.newLine();
+logStream.flush();
+  }
+
+  public void logAllTsEnd() throws IOException {
+logStream.write(STR_ALL_TS_END);
+logStream.newLine();
+logStream.flush();
+  }
+
+  public void logFileMergeStart(File file, long position) throws IOException {
+logStream.write(String.format("%s %d", file.getAbsolutePath(), position));
+logStream.newLine();
+logStream.flush();
+  }
+
+  public void logFileMergeEnd() throws IOException {
+logStream.write(STR_END);
+logStream.newLine();
+logStream.flush();
+  }
+
+  public void logMergeEnd() throws IOException {
+logStream.write(STR_MERGE_END);
+logStream.newLine();
+logStream.flush();
+  }
+
+  public void logFiles(MergeResource resource) throws IOException {
+logSeqFiles(resource.getSeqFiles());
+logUnseqFiles(resource.getUnseqFiles());
+  }
+
+  public void logAllTS(List paths) throws IOException {
 
 Review comment:
   It is negligible compared to the merge task itself and it won't last for 
long. But now I get the paths from MManager,  it is not necessary to record 
them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314573246
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
 ##
 @@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeTask merges given seqFiles and unseqFiles into a new one, which 
basically consists of three
+ * steps: 1. rewrite overflowed, modified or small-sized chunks into temp 
merge files
+ *2. move the merged chunks in the temp files back to the seqFiles or 
move the unmerged
+ *chunks in the seqFiles int temp files and replace the seqFiles with 
the temp files.
+ *3. remove unseqFiles
+ */
+public class MergeTask implements Callable {
+
+  public static final String MERGE_SUFFIX = ".merge";
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeTask.class);
+
+  MergeResource resource;
+  String storageGroupDir;
+  MergeLogger mergeLogger;
+  MergeContext mergeContext = new MergeContext();
+
+  private MergeCallback callback;
+  int concurrentMergeSeriesNum;
+  String taskName;
+  boolean fullMerge;
+
+  MergeTask(List seqFiles,
+  List unseqFiles, String storageGroupDir, MergeCallback 
callback,
+  String taskName, boolean fullMerge) {
+this.resource = new MergeResource(seqFiles, unseqFiles);
+this.storageGroupDir = storageGroupDir;
+this.callback = callback;
+this.taskName = taskName;
+this.fullMerge = fullMerge;
+this.concurrentMergeSeriesNum = 1;
+  }
+
+  public MergeTask(MergeResource mergeResource, String storageGroupDir, 
MergeCallback callback,
+  String taskName, boolean fullMerge, int concurrentMergeSeriesNum) {
+this.resource = mergeResource;
+this.storageGroupDir = storageGroupDir;
+this.callback = callback;
+this.taskName = taskName;
+this.fullMerge = fullMerge;
+this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
+  }
+
+  @Override
+  public Void call() throws Exception {
+try  {
+  doMerge();
+} catch (Exception e) {
+  logger.error("Runtime exception in merge {}", taskName, e);
+  cleanUp(false);
+  // call the callback to make sure the StorageGroup exit merging status, 
but passing 2
+  // empty file lists to avoid files being deleted.
+  callback.call(Collections.emptyList(), Collections.emptyList(), new 
File(storageGroupDir, MergeLogger.MERGE_LOG_NAME));
+  throw e;
+}
+return null;
+  }
+
+  private void doMerge() throws IOException {
+if (logger.isInfoEnabled()) {
+  logger.info("{} starts to merge {} seqFiles, {} unseqFiles", taskName,
+  resource.getSeqFiles().size(), resource.getUnseqFiles().size());
+}
+long startTime = System.currentTimeMillis();
+long totalFileSize = MergeUtils.collectFileSizes(resource.getSeqFiles(),
+resource.getUnseqFiles());
+mergeLogger = new MergeLogger(storageGroupDir);
+
+mergeLogger.logFiles(resource);
 
 Review comment:
   See the StorageGroupProcessor.merge() for the answer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314573126
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
 ##
 @@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeUtils.class);
+
+  private MergeUtils() {
+// util class
+  }
+  
+  public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter 
chunkWriter) {
+switch (chunkWriter.getDataType()) {
+  case TEXT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBinary());
+break;
+  case DOUBLE:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getDouble());
+break;
+  case BOOLEAN:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBoolean());
+break;
+  case INT64:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getLong());
+break;
+  case INT32:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getInt());
+break;
+  case FLOAT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getFloat());
+break;
+  default:
+throw new UnsupportedOperationException("Unknown data type " + 
chunkWriter.getDataType());
+}
+  }
+
+  /**
+   * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge 
and sort them
+   * before return.
+   * @param resource
+   * @return all paths contained in the merge.
+   * @throws IOException
+   */
+  public static List collectPaths(MergeResource resource)
+  throws IOException {
+Set pathSet = new HashSet<>();
+for (TsFileResource tsFileResource : resource.getUnseqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
+  
resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema());
+  pathSet.addAll(collectFileSeries(sequenceReader));
+}
+for (TsFileResource tsFileResource : resource.getSeqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
+  
resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema());
+  pathSet.addAll(collectFileSeries(sequenceReader));
+}
+List ret = new ArrayList<>(pathSet);
+ret.sort(Comparator.comparing(Path::getFullPath));
+return ret;
+  }
+
+  private static List collectFileSeries(TsFileSequenceReader 
sequenceReader) throws IOException {
+TsFileMetaData metaData = sequenceReader.readFileMetadata();
+Set deviceIds = metaData.getDeviceMap().keySet();
+Set measurements = metaData.getMeasurementSchema().keySet();
+List paths = new ArrayList<>();
+for (String deviceId : deviceIds) {
+  for (String 

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314572799
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
 ##
 @@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeUtils.class);
+
+  private MergeUtils() {
+// util class
+  }
+  
+  public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter 
chunkWriter) {
+switch (chunkWriter.getDataType()) {
+  case TEXT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBinary());
+break;
+  case DOUBLE:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getDouble());
+break;
+  case BOOLEAN:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBoolean());
+break;
+  case INT64:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getLong());
+break;
+  case INT32:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getInt());
+break;
+  case FLOAT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getFloat());
+break;
+  default:
+throw new UnsupportedOperationException("Unknown data type " + 
chunkWriter.getDataType());
+}
+  }
+
+  /**
+   * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge 
and sort them
+   * before return.
+   * @param resource
+   * @return all paths contained in the merge.
+   * @throws IOException
+   */
+  public static List collectPaths(MergeResource resource)
+  throws IOException {
+Set pathSet = new HashSet<>();
+for (TsFileResource tsFileResource : resource.getUnseqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
 
 Review comment:
   Replied before, even if it can, it is negligible and not worth the coupling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314572858
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
 ##
 @@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeUtils.class);
+
+  private MergeUtils() {
+// util class
+  }
+  
+  public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter 
chunkWriter) {
+switch (chunkWriter.getDataType()) {
+  case TEXT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBinary());
+break;
+  case DOUBLE:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getDouble());
+break;
+  case BOOLEAN:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBoolean());
+break;
+  case INT64:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getLong());
+break;
+  case INT32:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getInt());
+break;
+  case FLOAT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getFloat());
+break;
+  default:
+throw new UnsupportedOperationException("Unknown data type " + 
chunkWriter.getDataType());
+}
+  }
+
+  /**
+   * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge 
and sort them
+   * before return.
+   * @param resource
+   * @return all paths contained in the merge.
+   * @throws IOException
+   */
+  public static List collectPaths(MergeResource resource)
+  throws IOException {
+Set pathSet = new HashSet<>();
+for (TsFileResource tsFileResource : resource.getUnseqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
+  
resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema());
+  pathSet.addAll(collectFileSeries(sequenceReader));
+}
+for (TsFileResource tsFileResource : resource.getSeqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
 
 Review comment:
   See response before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314572600
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
 ##
 @@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeTask merges given seqFiles and unseqFiles into a new one, which 
basically consists of three
+ * steps: 1. rewrite overflowed, modified or small-sized chunks into temp 
merge files
+ *2. move the merged chunks in the temp files back to the seqFiles or 
move the unmerged
+ *chunks in the seqFiles int temp files and replace the seqFiles with 
the temp files.
+ *3. remove unseqFiles
+ */
+public class MergeTask implements Callable {
+
+  public static final String MERGE_SUFFIX = ".merge";
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeTask.class);
+
+  MergeResource resource;
+  String storageGroupDir;
+  MergeLogger mergeLogger;
+  MergeContext mergeContext = new MergeContext();
+
+  private MergeCallback callback;
+  int concurrentMergeSeriesNum;
+  String taskName;
+  boolean fullMerge;
+
+  MergeTask(List seqFiles,
+  List unseqFiles, String storageGroupDir, MergeCallback 
callback,
+  String taskName, boolean fullMerge) {
+this.resource = new MergeResource(seqFiles, unseqFiles);
+this.storageGroupDir = storageGroupDir;
+this.callback = callback;
+this.taskName = taskName;
+this.fullMerge = fullMerge;
+this.concurrentMergeSeriesNum = 1;
+  }
+
+  public MergeTask(MergeResource mergeResource, String storageGroupDir, 
MergeCallback callback,
+  String taskName, boolean fullMerge, int concurrentMergeSeriesNum) {
+this.resource = mergeResource;
+this.storageGroupDir = storageGroupDir;
+this.callback = callback;
+this.taskName = taskName;
+this.fullMerge = fullMerge;
+this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
+  }
+
+  @Override
+  public Void call() throws Exception {
+try  {
+  doMerge();
+} catch (Exception e) {
+  logger.error("Runtime exception in merge {}", taskName, e);
+  cleanUp(false);
+  // call the callback to make sure the StorageGroup exit merging status, 
but passing 2
+  // empty file lists to avoid files being deleted.
+  callback.call(Collections.emptyList(), Collections.emptyList(), new 
File(storageGroupDir, MergeLogger.MERGE_LOG_NAME));
 
 Review comment:
   When such situation emerges, the MergeLogger cannot be closed correctly and 
some fatal error must have happened. In such situation, further merges should 
not proceed thus not calling the callback may be better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314571495
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
 ##
 @@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
 
 Review comment:
   It is also used in selectors, so no.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314571438
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
 ##
 @@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeUtils.class);
+
+  private MergeUtils() {
+// util class
+  }
+  
+  public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter 
chunkWriter) {
+switch (chunkWriter.getDataType()) {
+  case TEXT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBinary());
+break;
+  case DOUBLE:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getDouble());
+break;
+  case BOOLEAN:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBoolean());
+break;
+  case INT64:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getLong());
+break;
+  case INT32:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getInt());
+break;
+  case FLOAT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getFloat());
+break;
+  default:
+throw new UnsupportedOperationException("Unknown data type " + 
chunkWriter.getDataType());
+}
+  }
+
+  /**
+   * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge 
and sort them
+   * before return.
+   * @param resource
+   * @return all paths contained in the merge.
+   * @throws IOException
+   */
+  public static List collectPaths(MergeResource resource)
+  throws IOException {
+Set pathSet = new HashSet<>();
+for (TsFileResource tsFileResource : resource.getUnseqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
+  
resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema());
+  pathSet.addAll(collectFileSeries(sequenceReader));
+}
+for (TsFileResource tsFileResource : resource.getSeqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
+  
resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema());
+  pathSet.addAll(collectFileSeries(sequenceReader));
+}
+List ret = new ArrayList<>(pathSet);
+ret.sort(Comparator.comparing(Path::getFullPath));
+return ret;
+  }
+
+  private static List collectFileSeries(TsFileSequenceReader 
sequenceReader) throws IOException {
+TsFileMetaData metaData = sequenceReader.readFileMetadata();
+Set deviceIds = metaData.getDeviceMap().keySet();
+Set measurements = metaData.getMeasurementSchema().keySet();
+List paths = new ArrayList<>();
+for (String deviceId : deviceIds) {
+  for (String 

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314570343
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
 ##
 @@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeUtils.class);
+
+  private MergeUtils() {
+// util class
+  }
+  
+  public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter 
chunkWriter) {
+switch (chunkWriter.getDataType()) {
+  case TEXT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBinary());
+break;
+  case DOUBLE:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getDouble());
+break;
+  case BOOLEAN:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBoolean());
+break;
+  case INT64:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getLong());
+break;
+  case INT32:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getInt());
+break;
+  case FLOAT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getFloat());
+break;
+  default:
+throw new UnsupportedOperationException("Unknown data type " + 
chunkWriter.getDataType());
+}
+  }
+
+  /**
+   * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge 
and sort them
+   * before return.
+   * @param resource
+   * @return all paths contained in the merge.
+   * @throws IOException
+   */
+  public static List collectPaths(MergeResource resource)
+  throws IOException {
+Set pathSet = new HashSet<>();
+for (TsFileResource tsFileResource : resource.getUnseqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
+  
resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema());
+  pathSet.addAll(collectFileSeries(sequenceReader));
+}
+for (TsFileResource tsFileResource : resource.getSeqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
+  
resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema());
+  pathSet.addAll(collectFileSeries(sequenceReader));
+}
+List ret = new ArrayList<>(pathSet);
+ret.sort(Comparator.comparing(Path::getFullPath));
+return ret;
+  }
+
+  private static List collectFileSeries(TsFileSequenceReader 
sequenceReader) throws IOException {
+TsFileMetaData metaData = sequenceReader.readFileMetadata();
+Set deviceIds = metaData.getDeviceMap().keySet();
+Set measurements = metaData.getMeasurementSchema().keySet();
+List paths = new ArrayList<>();
+for (String deviceId : deviceIds) {
+  for (String 

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314570087
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
 ##
 @@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeUtils.class);
+
+  private MergeUtils() {
+// util class
+  }
+  
+  public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter 
chunkWriter) {
+switch (chunkWriter.getDataType()) {
+  case TEXT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBinary());
+break;
+  case DOUBLE:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getDouble());
+break;
+  case BOOLEAN:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getBoolean());
+break;
+  case INT64:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getLong());
+break;
+  case INT32:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getInt());
+break;
+  case FLOAT:
+chunkWriter.write(timeValuePair.getTimestamp(), 
timeValuePair.getValue().getFloat());
+break;
+  default:
+throw new UnsupportedOperationException("Unknown data type " + 
chunkWriter.getDataType());
+}
+  }
+
+  /**
+   * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge 
and sort them
+   * before return.
+   * @param resource
+   * @return all paths contained in the merge.
+   * @throws IOException
+   */
+  public static List collectPaths(MergeResource resource)
+  throws IOException {
+Set pathSet = new HashSet<>();
+for (TsFileResource tsFileResource : resource.getUnseqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
+  
resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema());
+  pathSet.addAll(collectFileSeries(sequenceReader));
+}
+for (TsFileResource tsFileResource : resource.getSeqFiles()) {
+  TsFileSequenceReader sequenceReader = 
resource.getFileReader(tsFileResource);
+  
resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema());
+  pathSet.addAll(collectFileSeries(sequenceReader));
+}
+List ret = new ArrayList<>(pathSet);
+ret.sort(Comparator.comparing(Path::getFullPath));
+return ret;
+  }
+
+  private static List collectFileSeries(TsFileSequenceReader 
sequenceReader) throws IOException {
+TsFileMetaData metaData = sequenceReader.readFileMetadata();
+Set deviceIds = metaData.getDeviceMap().keySet();
+Set measurements = metaData.getMeasurementSchema().keySet();
+List paths = new ArrayList<>();
+for (String deviceId : deviceIds) {
+  for (String 

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314569883
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
 ##
 @@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import 
org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader;
+import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeResource manages files and caches of readers, writers, 
MeasurementSchemas and
+ * modifications to avoid unnecessary object creations and file openings.
+ */
+public class MergeResource {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeResource.class);
+
+  private List seqFiles;
+  private List unseqFiles;
+
+  private QueryContext mergeContext = new QueryContext();
+
+  private Map fileReaderCache;
 
 Review comment:
   This coupling hardly brings any merit. It may save some time reading 
TsFileMetadata, but it is relatively small compared to the MergeTask itself. 
And the chance you are merging a file that is being queried is not that big 
too, so I prefer to decouple it from other modules.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314568335
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
 ##
 @@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import 
org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader;
+import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
+import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeResource manages files and caches of readers, writers, 
MeasurementSchemas and
+ * modifications to avoid unnecessary object creations and file openings.
+ */
+public class MergeResource {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeResource.class);
+
+  private List seqFiles;
+  private List unseqFiles;
+
+  private QueryContext mergeContext = new QueryContext();
 
 Review comment:
   Removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314568143
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java
 ##
 @@ -39,7 +41,7 @@
   /**
* key: Tsfile path. value: TsFileMetaData
*/
-  private LRULinkedHashMap cache;
+  private LRULinkedHashMap cache;
 
 Review comment:
   We do not have two TsFileResource for the same TsFile, so it is fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314568004
 
 

 ##
 File path: server/iotdb/conf/iotdb-engine.properties
 ##
 @@ -155,7 +155,43 @@ concurrent_flush_thread=0
 
 # whether take over the memory management by IoTDB rather than JVM when 
serializing memtable as bytes in memory
 # (i.e., whether use ChunkBufferPool), value true, false
-chunk_buffer_pool_enable = false
+chunk_buffer_pool_enable=false
+
+
+### Merge Configurations
+
+
+# How many thread will be set up to perform merges, 1 by default.
+# Set to 1 when less than or equal to 0.
+merge_thread_num=1
+
+# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM 
memory by default.
+# This is only a rough estimation, starting from a relatively small value to 
avoid OOM.
+# Each new merge thread may take such memory, so merge_thread_num * 
merge_memory_budget is the
+# total memory estimation of merge.
+# merge_memory_budget=2147483648
+
+# When set to true, if some crashed merges are detected during system 
rebooting, such merges will
+# be continued, otherwise, the unfinished parts of such merges will not be 
continued while the
+# finished parts still remains as they are.
+# If you are feeling the rebooting is too slow, set this to false, false by 
default
+continue_merge_after_reboot=false
+
+# A global merge will be performed each such interval, that is, each storage 
group will be merged
+# (if proper merge candidates can be found). Unit: second, default: 1hours.
+# When less than or equal to 0, timed merge is disabled.
+merge_interval_sec=3600
+
+# When set to true, all merges becomes full merge (the whole SeqFiles are 
re-written despite how
 
 Review comment:
   No overflow, no merge. The most and certain merit of doing merge is to make 
the unsequential files sequential.
   Making the deleted data disappear physically is fine, but it would be 
considered as a future work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314567286
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
 ##
 @@ -229,6 +230,45 @@
*/
   private boolean chunkBufferPoolEnable = false;
 
+  /**
+   * How much memory (in byte) can be used by a single merge task.
+   */
+  private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 
0.2);
+
+  /**
+   * How many thread will be set up to perform merges.
+   */
+  private int mergeThreadNum = 1;
+
+  /**
+   * When set to true, if some crashed merges are detected during system 
rebooting, such merges will
+   * be continued, otherwise, the unfinished parts of such merges will not be 
continued while the
+   * finished parts still remain as they are.
+   */
+  private boolean continueMergeAfterReboot = true;
+
+  /**
+   * A global merge will be performed each such interval, that is, each 
storage group will be merged
+   * (if proper merge candidates can be found). Unit: second.
+   */
+  private long mergeIntervalSec = 2 * 3600L;
+
+  /**
+   * When set to true, all merges becomes full merge (the whole SeqFiles are 
re-written despite how
+   * much they are overflowed). This may increase merge overhead depending on 
how much the SeqFiles
+   * are overflowed.
+   */
+  private boolean forceFullMerge = false;
+
+  /**
+   * During a merge, if a chunk with less number of chunks than this 
parameter, the chunk will be
+   * merged with its succeeding chunks even if it is not overflowed, until the 
merged chunks reach
+   * this threshold and the new chunk will be flushed.
+   */
+  private int chunkMergePointThreshold = 512;
+
+  private MergeFileStrategy mergeFileStrategy = 
MergeFileStrategy.MAX_SERIES_NUM;
 
 Review comment:
   This parameter is experimental, not for user. Developers may change it by 
changing the source code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-15 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314566945
 
 

 ##
 File path: server/iotdb/conf/iotdb-engine.properties
 ##
 @@ -155,7 +155,43 @@ concurrent_flush_thread=0
 
 # whether take over the memory management by IoTDB rather than JVM when 
serializing memtable as bytes in memory
 # (i.e., whether use ChunkBufferPool), value true, false
-chunk_buffer_pool_enable = false
+chunk_buffer_pool_enable=false
+
+
+### Merge Configurations
+
+
+# How many thread will be set up to perform merges, 1 by default.
+# Set to 1 when less than or equal to 0.
+merge_thread_num=1
+
+# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM 
memory by default.
+# This is only a rough estimation, starting from a relatively small value to 
avoid OOM.
+# Each new merge thread may take such memory, so merge_thread_num * 
merge_memory_budget is the
+# total memory estimation of merge.
+# merge_memory_budget=2147483648
+
+# When set to true, if some crashed merges are detected during system 
rebooting, such merges will
+# be continued, otherwise, the unfinished parts of such merges will not be 
continued while the
+# finished parts still remains as they are.
+# If you are feeling the rebooting is too slow, set this to false, false by 
default
+continue_merge_after_reboot=false
+
+# A global merge will be performed each such interval, that is, each storage 
group will be merged
+# (if proper merge candidates can be found). Unit: second, default: 1hours.
+# When less than or equal to 0, timed merge is disabled.
+merge_interval_sec=3600
+
+# When set to true, all merges becomes full merge (the whole SeqFiles are 
re-written despite how
+# much they are overflowed). This may increase merge overhead depending on how 
much the SeqFiles
+# are overflowed.
+force_full_merge=false
+
+# During a merge, if a chunk with less number of chunks than this parameter, 
the chunk will be
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-08-13 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313672394
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
 ##
 @@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeTask merges given seqFiles and unseqFiles into a new one, which 
basically consists of three
+ * steps: 1. rewrite overflowed, modified or small-sized chunks into temp 
merge files
+ *2. move the merged chunks in the temp files back to the seqFiles or 
move the unmerged
+ *chunks in the seqFiles int temp files and replace the seqFiles with 
the temp files.
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-26 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r307677023
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
 ##
 @@ -97,6 +98,7 @@ private void setUp() throws StartupException {
 registerManager.register(Measurement.INSTANCE);
 registerManager.register(SyncServerManager.getInstance());
 registerManager.register(TVListAllocator.getInstance());
+registerManager.register(MergeManager.getINSTANCE());
 
 Review comment:
   It's okay since RecoverMergeTasks are not submitted to the MergeManager, 
they are done synchronously instead. Imagine, if the timed merge task is 
triggered before the StorageEngine is set up, things will become a mess.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-26 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r307674702
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
 ##
 @@ -97,6 +98,7 @@ private void setUp() throws StartupException {
 registerManager.register(Measurement.INSTANCE);
 registerManager.register(SyncServerManager.getInstance());
 registerManager.register(TVListAllocator.getInstance());
+registerManager.register(MergeManager.getINSTANCE());
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-26 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r307673734
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
 ##
 @@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeFileTask merges the merge temporary files with the seqFiles, either 
move the merged
+ * chunks in the temp files into the seqFiles or move the unmerged chunks into 
the merge temp
+ * files, depending on which one is the majority.
+ */
+class MergeFileTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeFileTask.class);
+
+  private String taskName;
+  private MergeContext context;
+  private MergeLogger mergeLogger;
+  private MergeResource resource;
+  private List unmergedFiles;
+
+  MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
+  MergeResource resource, List unmergedSeqFiles) {
+this.taskName = taskName;
+this.context = context;
+this.mergeLogger = mergeLogger;
+this.resource = resource;
+this.unmergedFiles = unmergedSeqFiles;
+  }
+
+  void mergeFiles() throws IOException {
+// decide whether to write the unmerged chunks to the merge files or to 
move the merged chunks
+// back to the origin seqFile's
+if (logger.isInfoEnabled()) {
+  logger.info("{} starts to merge {} files", taskName, 
unmergedFiles.size());
+}
+long startTime = System.currentTimeMillis();
+int cnt = 0;
+for (TsFileResource seqFile : unmergedFiles) {
+  int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 
0);
+  int unmergedChunkNum = 
context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
+  if (mergedChunkNum >= unmergedChunkNum) {
+// move the unmerged data to the new file
+if (logger.isInfoEnabled()) {
+  logger.info("{} moving unmerged data of {} to the merged file, {} 
merged chunks, {} "
+  + "unmerged chunks", taskName, seqFile.getFile().getName(), 
mergedChunkNum, unmergedChunkNum);
+}
+moveUnmergedToNew(seqFile);
+  } else {
+// move the merged data to the old file
+if (logger.isInfoEnabled()) {
 
 Review comment:
   If you are not okay with that, just turn on full merge option from the 
beginning, otherwise, you just are making the previous merges do some useless 
work. We may add further compactions in the future, but it is not the concern 
here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-26 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r307673734
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
 ##
 @@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeFileTask merges the merge temporary files with the seqFiles, either 
move the merged
+ * chunks in the temp files into the seqFiles or move the unmerged chunks into 
the merge temp
+ * files, depending on which one is the majority.
+ */
+class MergeFileTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeFileTask.class);
+
+  private String taskName;
+  private MergeContext context;
+  private MergeLogger mergeLogger;
+  private MergeResource resource;
+  private List unmergedFiles;
+
+  MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
+  MergeResource resource, List unmergedSeqFiles) {
+this.taskName = taskName;
+this.context = context;
+this.mergeLogger = mergeLogger;
+this.resource = resource;
+this.unmergedFiles = unmergedSeqFiles;
+  }
+
+  void mergeFiles() throws IOException {
+// decide whether to write the unmerged chunks to the merge files or to 
move the merged chunks
+// back to the origin seqFile's
+if (logger.isInfoEnabled()) {
+  logger.info("{} starts to merge {} files", taskName, 
unmergedFiles.size());
+}
+long startTime = System.currentTimeMillis();
+int cnt = 0;
+for (TsFileResource seqFile : unmergedFiles) {
+  int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 
0);
+  int unmergedChunkNum = 
context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
+  if (mergedChunkNum >= unmergedChunkNum) {
+// move the unmerged data to the new file
+if (logger.isInfoEnabled()) {
+  logger.info("{} moving unmerged data of {} to the merged file, {} 
merged chunks, {} "
+  + "unmerged chunks", taskName, seqFile.getFile().getName(), 
mergedChunkNum, unmergedChunkNum);
+}
+moveUnmergedToNew(seqFile);
+  } else {
+// move the merged data to the old file
+if (logger.isInfoEnabled()) {
 
 Review comment:
   If you are not okay with that, just turn on full merge option from the 
beginning. We may add further compactions in the future, but it is not the 
concern here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-26 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r307675040
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
 ##
 @@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeManager provides a ThreadPool to queue and run all merge tasks to 
restrain the total
+ * resources occupied by merge and manages a Timer to periodically issue a 
global merge.
+ */
+public class MergeManager implements IService {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeManager.class);
+  private static final MergeManager INSTANCE = new MergeManager();
+
+  private AtomicInteger threadCnt = new AtomicInteger();
+  private ThreadPoolExecutor mergeTaskPool;
+  private ScheduledExecutorService timedMergeThreadPool;
+
+  private MergeManager() {
+  }
 
 Review comment:
   replied previously


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-26 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r307675040
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
 ##
 @@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeManager provides a ThreadPool to queue and run all merge tasks to 
restrain the total
+ * resources occupied by merge and manages a Timer to periodically issue a 
global merge.
+ */
+public class MergeManager implements IService {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeManager.class);
+  private static final MergeManager INSTANCE = new MergeManager();
+
+  private AtomicInteger threadCnt = new AtomicInteger();
+  private ThreadPoolExecutor mergeTaskPool;
+  private ScheduledExecutorService timedMergeThreadPool;
+
+  private MergeManager() {
+  }
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-26 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r307674976
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
 ##
 @@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.manage;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeManager provides a ThreadPool to queue and run all merge tasks to 
restrain the total
+ * resources occupied by merge and manages a Timer to periodically issue a 
global merge.
+ */
+public class MergeManager implements IService {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeManager.class);
+  private static final MergeManager INSTANCE = new MergeManager();
+
+  private AtomicInteger threadCnt = new AtomicInteger();
+  private ThreadPoolExecutor mergeTaskPool;
+  private ScheduledExecutorService timedMergeThreadPool;
+
+  private MergeManager() {
+  }
+
+  public static MergeManager getINSTANCE() {
+return INSTANCE;
+  }
+
+  public void submit(MergeTask mergeTask) {
+mergeTaskPool.submit(mergeTask);
+  }
+
+  @Override
+  public void start() {
+if (mergeTaskPool == null) {
+  int threadNum = 
IoTDBDescriptor.getInstance().getConfig().getMergeConcurrentThreads();
+  if (threadNum <= 0) {
+threadNum = 1;
+  }
+  mergeTaskPool =
+  (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum,
+  r -> new Thread(r, "MergeThread-" + 
threadCnt.getAndIncrement()));
+  long mergeInterval = 
IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
+  if (mergeInterval > 0) {
+timedMergeThreadPool = Executors.newSingleThreadScheduledExecutor( r 
-> new Thread(r,
+"TimedMergeThread"));
+timedMergeThreadPool.scheduleAtFixedRate(this::flushAll, mergeInterval,
+mergeInterval, TimeUnit.SECONDS);
+  }
+  logger.info("MergeManager started");
+}
+  }
+
+  @Override
+  public void stop() {
+if (mergeTaskPool != null) {
+  if (timedMergeThreadPool != null) {
+timedMergeThreadPool.shutdownNow();
+timedMergeThreadPool = null;
+  }
+  mergeTaskPool.shutdownNow();
+  logger.info("Waiting for task pool to shut down");
+  while (!mergeTaskPool.isShutdown()) {
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-26 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r307674702
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
 ##
 @@ -97,6 +98,7 @@ private void setUp() throws StartupException {
 registerManager.register(Measurement.INSTANCE);
 registerManager.register(SyncServerManager.getInstance());
 registerManager.register(TVListAllocator.getInstance());
+registerManager.register(MergeManager.getINSTANCE());
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-26 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r307673734
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
 ##
 @@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeFileTask merges the merge temporary files with the seqFiles, either 
move the merged
+ * chunks in the temp files into the seqFiles or move the unmerged chunks into 
the merge temp
+ * files, depending on which one is the majority.
+ */
+class MergeFileTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeFileTask.class);
+
+  private String taskName;
+  private MergeContext context;
+  private MergeLogger mergeLogger;
+  private MergeResource resource;
+  private List unmergedFiles;
+
+  MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
+  MergeResource resource, List unmergedSeqFiles) {
+this.taskName = taskName;
+this.context = context;
+this.mergeLogger = mergeLogger;
+this.resource = resource;
+this.unmergedFiles = unmergedSeqFiles;
+  }
+
+  void mergeFiles() throws IOException {
+// decide whether to write the unmerged chunks to the merge files or to 
move the merged chunks
+// back to the origin seqFile's
+if (logger.isInfoEnabled()) {
+  logger.info("{} starts to merge {} files", taskName, 
unmergedFiles.size());
+}
+long startTime = System.currentTimeMillis();
+int cnt = 0;
+for (TsFileResource seqFile : unmergedFiles) {
+  int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 
0);
+  int unmergedChunkNum = 
context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
+  if (mergedChunkNum >= unmergedChunkNum) {
+// move the unmerged data to the new file
+if (logger.isInfoEnabled()) {
+  logger.info("{} moving unmerged data of {} to the merged file, {} 
merged chunks, {} "
+  + "unmerged chunks", taskName, seqFile.getFile().getName(), 
mergedChunkNum, unmergedChunkNum);
+}
+moveUnmergedToNew(seqFile);
+  } else {
+// move the merged data to the old file
+if (logger.isInfoEnabled()) {
 
 Review comment:
   If you are not okay with that, just turn on full merge option from the 
beginning. You may add further compactions in the future, but it is not the 
concern here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-23 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r306609546
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxSeriesMergeFileSelector.java
 ##
 @@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.selector;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.MergeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MaxSeriesMergeFileSelector is an extension of MergeFileSelector which tries 
to maximize the
+ * number of timeseries that can be merged at the same time.
+ */
+public class MaxSeriesMergeFileSelector extends MergeFileSelector {
+
+  public static final int MAX_SERIES_NUM = 1024;
+  private static final Logger logger = 
LoggerFactory.getLogger(MaxSeriesMergeFileSelector.class);
+
+  private List lastSelectedSeqFiles;
+  private List lastSelectedUnseqFiles;
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-23 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r306607986
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
 ##
 @@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeFileTask merges the merge temporary files with the seqFiles, either 
move the merged
+ * chunks in the temp files into the seqFiles or move the unmerged chunks into 
the merge temp
+ * files, depending on which one is the majority.
+ */
+class MergeFileTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeFileTask.class);
+
+  private String taskName;
+  private MergeContext context;
+  private MergeLogger mergeLogger;
+  private MergeResource resource;
+  private List unmergedFiles;
+
+  MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
+  MergeResource resource, List unmergedSeqFiles) {
+this.taskName = taskName;
+this.context = context;
+this.mergeLogger = mergeLogger;
+this.resource = resource;
+this.unmergedFiles = unmergedSeqFiles;
+  }
+
+  void mergeFiles() throws IOException {
+// decide whether to write the unmerged chunks to the merge files or to 
move the merged chunks
+// back to the origin seqFile's
+if (logger.isInfoEnabled()) {
+  logger.info("{} starts to merge {} files", taskName, 
unmergedFiles.size());
+}
+long startTime = System.currentTimeMillis();
+int cnt = 0;
+for (TsFileResource seqFile : unmergedFiles) {
+  int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 
0);
+  int unmergedChunkNum = 
context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
+  if (mergedChunkNum >= unmergedChunkNum) {
+// move the unmerged data to the new file
+if (logger.isInfoEnabled()) {
+  logger.info("{} moving unmerged data of {} to the merged file, {} 
merged chunks, {} "
+  + "unmerged chunks", taskName, seqFile.getFile().getName(), 
mergedChunkNum, unmergedChunkNum);
+}
+moveUnmergedToNew(seqFile);
+  } else {
+// move the merged data to the old file
+if (logger.isInfoEnabled()) {
+  logger.info("{} moving merged data of {} to the old file {} merged 
chunks, {} "
+  + "unmerged chunks", taskName, seqFile.getFile().getName(), 
mergedChunkNum, unmergedChunkNum);
+}
+moveMergedToOld(seqFile);
+  }
+  cnt ++;
+  if (logger.isInfoEnabled()) {
+logger.debug("{} has merged {}/{} files", taskName, cnt, 
unmergedFiles.size());
+  }
+}
+if (logger.isInfoEnabled()) {
+  logger.info("{} has merged all files after {}ms", taskName, 
System.currentTimeMillis() - startTime);
+}
+mergeLogger.logMergeEnd();
+  }
+
+  private void moveMergedToOld(TsFileResource seqFile) throws IOException {
+int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
+if (mergedChunkNum == 

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-23 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r306607986
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
 ##
 @@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeFileTask merges the merge temporary files with the seqFiles, either 
move the merged
+ * chunks in the temp files into the seqFiles or move the unmerged chunks into 
the merge temp
+ * files, depending on which one is the majority.
+ */
+class MergeFileTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeFileTask.class);
+
+  private String taskName;
+  private MergeContext context;
+  private MergeLogger mergeLogger;
+  private MergeResource resource;
+  private List unmergedFiles;
+
+  MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
+  MergeResource resource, List unmergedSeqFiles) {
+this.taskName = taskName;
+this.context = context;
+this.mergeLogger = mergeLogger;
+this.resource = resource;
+this.unmergedFiles = unmergedSeqFiles;
+  }
+
+  void mergeFiles() throws IOException {
+// decide whether to write the unmerged chunks to the merge files or to 
move the merged chunks
+// back to the origin seqFile's
+if (logger.isInfoEnabled()) {
+  logger.info("{} starts to merge {} files", taskName, 
unmergedFiles.size());
+}
+long startTime = System.currentTimeMillis();
+int cnt = 0;
+for (TsFileResource seqFile : unmergedFiles) {
+  int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 
0);
+  int unmergedChunkNum = 
context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
+  if (mergedChunkNum >= unmergedChunkNum) {
+// move the unmerged data to the new file
+if (logger.isInfoEnabled()) {
+  logger.info("{} moving unmerged data of {} to the merged file, {} 
merged chunks, {} "
+  + "unmerged chunks", taskName, seqFile.getFile().getName(), 
mergedChunkNum, unmergedChunkNum);
+}
+moveUnmergedToNew(seqFile);
+  } else {
+// move the merged data to the old file
+if (logger.isInfoEnabled()) {
+  logger.info("{} moving merged data of {} to the old file {} merged 
chunks, {} "
+  + "unmerged chunks", taskName, seqFile.getFile().getName(), 
mergedChunkNum, unmergedChunkNum);
+}
+moveMergedToOld(seqFile);
+  }
+  cnt ++;
+  if (logger.isInfoEnabled()) {
+logger.debug("{} has merged {}/{} files", taskName, cnt, 
unmergedFiles.size());
+  }
+}
+if (logger.isInfoEnabled()) {
+  logger.info("{} has merged all files after {}ms", taskName, 
System.currentTimeMillis() - startTime);
+}
+mergeLogger.logMergeEnd();
+  }
+
+  private void moveMergedToOld(TsFileResource seqFile) throws IOException {
+int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
+if (mergedChunkNum == 

[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-23 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r306607818
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
 ##
 @@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeFileTask merges the merge temporary files with the seqFiles, either 
move the merged
+ * chunks in the temp files into the seqFiles or move the unmerged chunks into 
the merge temp
+ * files, depending on which one is the majority.
+ */
+class MergeFileTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeFileTask.class);
+
+  private String taskName;
+  private MergeContext context;
+  private MergeLogger mergeLogger;
+  private MergeResource resource;
+  private List unmergedFiles;
+
+  MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
+  MergeResource resource, List unmergedSeqFiles) {
+this.taskName = taskName;
+this.context = context;
+this.mergeLogger = mergeLogger;
+this.resource = resource;
+this.unmergedFiles = unmergedSeqFiles;
+  }
+
+  void mergeFiles() throws IOException {
+// decide whether to write the unmerged chunks to the merge files or to 
move the merged chunks
+// back to the origin seqFile's
+if (logger.isInfoEnabled()) {
+  logger.info("{} starts to merge {} files", taskName, 
unmergedFiles.size());
+}
+long startTime = System.currentTimeMillis();
+int cnt = 0;
+for (TsFileResource seqFile : unmergedFiles) {
+  int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 
0);
+  int unmergedChunkNum = 
context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
+  if (mergedChunkNum >= unmergedChunkNum) {
+// move the unmerged data to the new file
+if (logger.isInfoEnabled()) {
+  logger.info("{} moving unmerged data of {} to the merged file, {} 
merged chunks, {} "
+  + "unmerged chunks", taskName, seqFile.getFile().getName(), 
mergedChunkNum, unmergedChunkNum);
+}
+moveUnmergedToNew(seqFile);
+  } else {
+// move the merged data to the old file
+if (logger.isInfoEnabled()) {
 
 Review comment:
   I have provided a full merge option and you may use that if your disk space 
is not that much.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-23 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r306607529
 
 

 ##
 File path: 
server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
 ##
 @@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeFileTask merges the merge temporary files with the seqFiles, either 
move the merged
+ * chunks in the temp files into the seqFiles or move the unmerged chunks into 
the merge temp
+ * files, depending on which one is the majority.
+ */
+class MergeFileTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeFileTask.class);
+
+  private String taskName;
+  private MergeContext context;
+  private MergeLogger mergeLogger;
+  private MergeResource resource;
+  private List unmergedFiles;
+
+  MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger,
+  MergeResource resource, List unmergedSeqFiles) {
 
 Review comment:
   They are not always the same in a RecoverMergeTask and we may have already 
merged some files before the system was down.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-23 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r306607193
 
 

 ##
 File path: 
iotdb/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeContext.java
 ##
 @@ -0,0 +1,60 @@
+package org.apache.iotdb.db.engine.merge.manage;
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge

2019-07-23 Thread GitBox
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of 
merge
URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r306607041
 
 

 ##
 File path: 
iotdb/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
 ##
 @@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge.task;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.iotdb.db.engine.merge.manage.MergeContext;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.MergeUtils;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MergeTask merges given seqFiles and unseqFiles into a new one, which 
basically consists of three
+ * steps: 1. rewrite overflowed, modified or small-sized chunks into temp 
merge files
+ *2. move the merged chunks in the temp files back to the seqFiles or 
move the unmerged
+ *chunks in the seqFiles int temp files and replace the seqFiles with 
the temp files.
+ *3. remove unseqFiles
+ */
+public class MergeTask implements Callable {
+
+  public static final String MERGE_SUFFIX = ".merge";
+  private static final Logger logger = 
LoggerFactory.getLogger(MergeTask.class);
+
+  MergeResource resource;
+  String storageGroupDir;
+  MergeLogger mergeLogger;
+  MergeContext mergeContext = new MergeContext();
+
+  private MergeCallback callback;
+  int concurrentMergeSeriesNum;
+  String taskName;
+  boolean fullMerge;
+
+  public MergeTask(List seqFiles,
+  List unseqFiles, String storageGroupDir, MergeCallback 
callback,
+  String taskName, boolean fullMerge) {
+this.resource = new MergeResource(seqFiles, unseqFiles);
+this.storageGroupDir = storageGroupDir;
+this.callback = callback;
+this.taskName = taskName;
+this.fullMerge = fullMerge;
+this.concurrentMergeSeriesNum = 1;
+  }
+
+  public MergeTask(List seqFiles,
+  List unseqFiles, String storageGroupDir, MergeCallback 
callback,
+  String taskName, boolean fullMerge, int concurrentMergeSeriesNum) {
+this.resource = new MergeResource(seqFiles, unseqFiles);
+this.storageGroupDir = storageGroupDir;
+this.callback = callback;
+this.taskName = taskName;
+this.fullMerge = fullMerge;
+this.concurrentMergeSeriesNum = concurrentMergeSeriesNum;
+  }
+
+  @Override
+  public Void call() throws Exception {
+try  {
+  doMerge();
+} catch (Exception e) {
+  logger.error("Runtime exception in merge {}", taskName, e);
+  cleanUp(false);
+  // call the callback to make sure the StorageGroup exit merging status, 
but passing 2
+  // empty file lists to avoid files being deleted.
+  callback.call(Collections.emptyList(), Collections.emptyList(), new 
File(storageGroupDir, MergeLogger.MERGE_LOG_NAME));
+  throw e;
+}
+return null;
+  }
+
+  private void doMerge() throws IOException {
+if (logger.isInfoEnabled()) {
+  logger.info("{} starts to merge {} seqFiles, {} unseqFiles", taskName,
+  resource.getSeqFiles().size(), resource.getUnseqFiles().size());
+}
+long startTime = System.currentTimeMillis();
+long totalFileSize = MergeUtils.collectFileSizes(resource.getSeqFiles(),
+resource.getUnseqFiles());
+mergeLogger = new MergeLogger(storageGroupDir);
+
+mergeLogger.logFiles(resource);
+
+List unmergedSeries = MergeUtils.collectPaths(resource);
+unmergedSeries.sort(Comparator.comparing(Path::getFullPath));
 
 Review comment:
   I removed the sort here, but I prefer to explain it in the method document.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this