[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312745476 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312740345 ## File path: server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java ## @@ -39,7 +41,7 @@ /** * key: Tsfile path. value: TsFileMetaData */ - private LRULinkedHashMap cache; + private LRULinkedHashMap cache; Review comment: Is that OK to use TsFileResource (you need to make sure that the instance are the same in memory) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312738934 ## File path: server/iotdb/conf/iotdb-engine.properties ## @@ -155,7 +155,43 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merges, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false + +# A global merge will be performed each such interval, that is, each storage group will be merged +# (if proper merge candidates can be found). Unit: second, default: 1hours. +# When less than or equal to 0, timed merge is disabled. +merge_interval_sec=3600 + +# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how +# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles +# are overflowed. +force_full_merge=false + +# During a merge, if a chunk with less number of chunks than this parameter, the chunk will be Review comment: > if a chunk with less number of chunks than this parameter "if a chunk with less number of points than this parameter"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314345112 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java ## @@ -151,6 +159,8 @@ public static TsFileMetaData deserializeFrom(ByteBuffer buffer) throws IOExcepti if (ReadWriteIOUtils.readIsNull(buffer)) { fileMetaData.createdBy = ReadWriteIOUtils.readString(buffer); } +fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer); Review comment: to keep compatibility, you'd better deserialize like this: ``` if(tsfile.version ==0.8.0) { totalChunkNumber = ?; invalidChunkNum = 0; } else{ fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer); ... } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313206195 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312738889 ## File path: server/iotdb/conf/iotdb-engine.properties ## @@ -155,7 +155,43 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merges, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false Review comment: hard to understand: If it is false, will the unfinished task be continued after startup? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314345452 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java ## @@ -108,6 +114,8 @@ public static TsFileMetaData deserializeFrom(InputStream inputStream) throws IOE if (ReadWriteIOUtils.readIsNull(inputStream)) { fileMetaData.createdBy = ReadWriteIOUtils.readString(inputStream); } +fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(inputStream); Review comment: keep compatibility This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314338342 ## File path: server/src/main/java/org/apache/iotdb/db/service/ServiceType.java ## @@ -31,6 +31,7 @@ AUTHORIZATION_SERVICE("Authorization ServerService", ""), FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""), SYNC_SERVICE("SYNC ServerService", ""), + MERGE_SERVICE("Merge Manager", ""), Review comment: it could be better if you add a jmx module about this task. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314340941 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java ## @@ -135,9 +135,9 @@ */ public static int pageCheckSizeThreshold = 100; /** - * Default endian value is LITTLE_ENDIAN. + * Default endian value is BIG_ENDIAN. Review comment: And, does the file compatible with 0.8.0? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314340102 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java ## @@ -135,9 +135,9 @@ */ public static int pageCheckSizeThreshold = 100; /** - * Default endian value is LITTLE_ENDIAN. + * Default endian value is BIG_ENDIAN. Review comment: it is ok, but why? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312744740 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); Review comment: remove this? or rename it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314337283 ## File path: server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java ## @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.chunkRelated; + +import java.io.IOException; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; + +public class CachedDiskChunkReader implements IPointReader { Review comment: I see, how about implementing this class by inherit DiskChunkReader? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314328295 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java ## @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.task; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache; +import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache; +import org.apache.iotdb.db.engine.merge.manage.MergeContext; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.recover.MergeLogger; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.schema.FileSchema; +import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeFileTask merges the merge temporary files with the seqFiles, either move the merged + * chunks in the temp files into the seqFiles or move the unmerged chunks into the merge temp + * files, depending on which one is the majority. + */ +class MergeFileTask { + + private static final Logger logger = LoggerFactory.getLogger(MergeFileTask.class); + + private String taskName; + private MergeContext context; + private MergeLogger mergeLogger; + private MergeResource resource; + private List unmergedFiles; + + MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger, + MergeResource resource, List unmergedSeqFiles) { +this.taskName = taskName; +this.context = context; +this.mergeLogger = mergeLogger; +this.resource = resource; +this.unmergedFiles = unmergedSeqFiles; + } + + void mergeFiles() throws IOException { +// decide whether to write the unmerged chunks to the merge files or to move the merged chunks +// back to the origin seqFile's +if (logger.isInfoEnabled()) { + logger.info("{} starts to merge {} files", taskName, unmergedFiles.size()); +} +long startTime = System.currentTimeMillis(); +int cnt = 0; +for (TsFileResource seqFile : unmergedFiles) { + int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0); + int unmergedChunkNum = context.getUnmergedChunkCnt().getOrDefault(seqFile, 0); + if (mergedChunkNum >= unmergedChunkNum) { +// move the unmerged data to the new file +if (logger.isInfoEnabled()) { + logger.info("{} moving unmerged data of {} to the merged file, {} merged chunks, {} " + + "unmerged chunks", taskName, seqFile.getFile().getName(), mergedChunkNum, unmergedChunkNum); +} +moveUnmergedToNew(seqFile); + } else { +// move the merged data to the old file +if (logger.isInfoEnabled()) { + logger.info("{} moving merged data of {} to the old file {} merged chunks, {} " + + "unmerged chunks", taskName, seqFile.getFile().getName(), mergedChunkNum, unmergedChunkNum); +} +moveMergedToOld(seqFile); + } + cnt ++; + if (logger.isInfoEnabled()) { +logger.debug("{} has merged {}/{} files", taskName, cnt, unmergedFiles.size()); + } +} +if (logger.isInfoEnabled()) { + logger.info("{} has merged all files after {}ms", taskName, System.currentTimeMillis() - startTime); +} +mergeLogger.logMergeEnd(); + } + + private void moveMergedToOld(TsFileResource seqFile)
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314334024 ## File path: server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java ## @@ -34,7 +35,7 @@ import org.slf4j.LoggerFactory; /** - * FileReaderManager is a singleton, which is used to manage + * resource.getSeqFiles()ager is a singleton, which is used to manage Review comment: inconrrect... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312745663 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314335046 ## File path: server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java ## @@ -109,52 +109,6 @@ public long assignJobId() { return jobId; } - /** - * Begin query and set query tokens of queryPaths. This method is used for projection - * calculation. - */ - public void beginQueryOfGivenQueryPaths(long jobId, List queryPaths) Review comment: please maintain the java doc of this class. (by the way, why delete these two methods?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313207654 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java ## @@ -0,0 +1,424 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.task; + +import static org.apache.iotdb.db.utils.MergeUtils.writeBatchPoint; +import static org.apache.iotdb.db.utils.MergeUtils.writeTVPair; +import static org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.merge.manage.MergeContext; +import org.apache.iotdb.db.engine.merge.manage.MergeManager; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.recover.MergeLogger; +import org.apache.iotdb.db.engine.merge.selector.MergePathSelector; +import org.apache.iotdb.db.engine.merge.selector.NaivePathSelector; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.db.utils.MergeUtils.MetaListEntry; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class MergeMultiChunkTask { + + private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class); + private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig() + .getChunkMergePointThreshold(); + + private MergeLogger mergeLogger; + private List unmergedSeries; + + private String taskName; + private MergeResource resource; + private TimeValuePair[] currTimeValuePairs; + private boolean fullMerge; + + private MergeContext mergeContext; + + private AtomicInteger mergedChunkNum = new AtomicInteger(); + private AtomicInteger unmergedChunkNum = new AtomicInteger(); + private int mergedSeriesCnt; + private double progress; + + private int concurrentMergeSeriesNum; + private List currMergingPaths = new ArrayList<>(); + + MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger, + MergeResource mergeResource, boolean fullMerge, List unmergedSeries, + int concurrentMergeSeriesNum) { +this.mergeContext = context; +this.taskName = taskName; +this.mergeLogger = mergeLogger; +this.resource = mergeResource; +this.fullMerge = fullMerge; +this.unmergedSeries = unmergedSeries; +this.concurrentMergeSeriesNum = concurrentMergeSeriesNum; + } + + void mergeSeries() throws IOException { +if (logger.isInfoEnabled()) { + logger.info("{} starts to merge {} series", taskName, unmergedSeries.size()); +} +long startTime = System.currentTimeMillis(); +for (TsFileResource seqFile : resource.getSeqFiles()) { + mergeContext.getUnmergedChunkStartTimes().put(seqFile, new HashMap<>()); +} +// merge each series and write data into each seqFile's corresponding temp merge file +List> devicePaths = MergeUtils.splitPathsByDevice(unmergedSeries);
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314336525 ## File path: server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java ## @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.chunkRelated; + +import java.io.IOException; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; + +public class CachedDiskChunkReader implements IPointReader { Review comment: I wonder why there is such a new class. What is the difference between it and ChunKReader? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313206796 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java ## @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.recover; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.read.common.Path; + +/** + * MergeLogger records the progress of a merge in file "merge.log" as text lines. + */ +public class MergeLogger { + + public static final String MERGE_LOG_NAME = "merge.log"; + + static final String STR_SEQ_FILES = "seqFiles"; + static final String STR_UNSEQ_FILES = "unseqFiles"; + static final String STR_TIMESERIES = "timeseries"; + static final String STR_START = "start"; + static final String STR_END = "end"; + static final String STR_ALL_TS_END = "all ts end"; + static final String STR_MERGE_START = "merge start"; + static final String STR_MERGE_END = "merge end"; + + private BufferedWriter logStream; + + public MergeLogger(String storageGroupDir) throws IOException { +logStream = new BufferedWriter(new FileWriter(new File(storageGroupDir, MERGE_LOG_NAME), true)); + } + + public void close() throws IOException { +logStream.close(); + } + + public void logTSStart(List paths) throws IOException { +logStream.write(STR_START); +for (Path path : paths) { + logStream.write(" " + path.getFullPath()); +} +logStream.newLine(); +logStream.flush(); + } + + public void logFilePositionUpdate(File file) throws IOException { +logStream.write(String.format("%s %d", file.getAbsolutePath(), file.length())); +logStream.newLine(); +logStream.flush(); + } + + public void logTSEnd() throws IOException { +logStream.write(STR_END); +logStream.newLine(); +logStream.flush(); + } + + public void logAllTsEnd() throws IOException { +logStream.write(STR_ALL_TS_END); +logStream.newLine(); +logStream.flush(); + } + + public void logFileMergeStart(File file, long position) throws IOException { +logStream.write(String.format("%s %d", file.getAbsolutePath(), position)); +logStream.newLine(); +logStream.flush(); + } + + public void logFileMergeEnd() throws IOException { +logStream.write(STR_END); +logStream.newLine(); +logStream.flush(); + } + + public void logMergeEnd() throws IOException { +logStream.write(STR_MERGE_END); +logStream.newLine(); +logStream.flush(); + } + + public void logFiles(MergeResource resource) throws IOException { +logSeqFiles(resource.getSeqFiles()); +logUnseqFiles(resource.getUnseqFiles()); + } + + public void logAllTS(List paths) throws IOException { Review comment: I am afraid that it is a little space overhead... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314339168 ## File path: server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java ## @@ -64,17 +74,93 @@ public static TimeValuePair getCurrentTimeValuePair(AggreResultData data) { case INT32: return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsInt(data.getIntRet())); case INT64: -return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsLong(data.getLongRet())); +return new TimeValuePair(data.getTimestamp(), +new TsPrimitiveType.TsLong(data.getLongRet())); case FLOAT: -return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsFloat(data.getFloatRet())); +return new TimeValuePair(data.getTimestamp(), +new TsPrimitiveType.TsFloat(data.getFloatRet())); case DOUBLE: -return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsDouble(data.getDoubleRet())); +return new TimeValuePair(data.getTimestamp(), +new TsPrimitiveType.TsDouble(data.getDoubleRet())); case TEXT: -return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsBinary(data.getBinaryRet())); +return new TimeValuePair(data.getTimestamp(), +new TsPrimitiveType.TsBinary(data.getBinaryRet())); case BOOLEAN: -return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsBoolean(data.isBooleanRet())); +return new TimeValuePair(data.getTimestamp(), +new TsPrimitiveType.TsBoolean(data.isBooleanRet())); default: throw new UnSupportedDataTypeException(String.valueOf(data.getDataType())); } } -} + + public static void setCurrentTimeValuePair(BatchData data, TimeValuePair current) { +current.setTimestamp(data.currentTime()); +switch (data.getDataType()) { + case INT32: +current.getValue().setInt(data.getInt()); +break; + case INT64: +current.getValue().setLong(data.getLong()); +break; + case FLOAT: +current.getValue().setFloat(data.getFloat()); +break; + case DOUBLE: +current.getValue().setDouble(data.getDouble()); +break; + case TEXT: +current.getValue().setBinary(data.getBinary()); +break; + case BOOLEAN: +current.getValue().setBoolean(data.getBoolean()); +break; + default: +throw new UnSupportedDataTypeException(String.valueOf(data.getDataType())); +} + } + + public static void setTimeValuePair(TimeValuePair from, TimeValuePair to) { +to.setTimestamp(from.getTimestamp()); +switch (from.getValue().getDataType()) { + case INT32: +to.getValue().setInt(from.getValue().getInt()); +break; + case INT64: +to.getValue().setLong(from.getValue().getLong()); +break; + case FLOAT: +to.getValue().setFloat(from.getValue().getFloat()); +break; + case DOUBLE: +to.getValue().setDouble(from.getValue().getDouble()); +break; + case TEXT: +to.getValue().setBinary(from.getValue().getBinary()); +break; + case BOOLEAN: +to.getValue().setBoolean(from.getValue().getBoolean()); +break; + default: +throw new UnSupportedDataTypeException(String.valueOf(from.getValue().getDataType())); +} + } + + public static TimeValuePair getEmptyTimeValuePair(TSDataType dataType) { +switch (dataType) { + case FLOAT: +return new TimeValuePair(0, new TsFloat(0.0f)); Review comment: how about using static object" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312746240 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314348101 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java ## @@ -65,6 +65,12 @@ private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES); protected String file; + private int totalChunkNum; + private TsFileMetaData tsFileMetaData; + + private boolean cacheDeviceMetadata = false; + private Map deviceMetadataMap; Review comment: @little-emotion @LeiRui if we add a cache here, is it helpful for query(Do we need to change some codes in current query process)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312738956 ## File path: server/iotdb/conf/iotdb-engine.properties ## @@ -155,7 +155,43 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merges, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false + +# A global merge will be performed each such interval, that is, each storage group will be merged +# (if proper merge candidates can be found). Unit: second, default: 1hours. +# When less than or equal to 0, timed merge is disabled. +merge_interval_sec=3600 + +# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how Review comment: how about if there is no overflow data? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312742401 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); + + private Map fileReaderCache; Review comment: Why not use `FileReaderManager` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312746271 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; Review comment: move this class to `task` package? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313206578 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.task; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.iotdb.db.engine.merge.manage.MergeContext; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.recover.MergeLogger; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.read.common.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeTask merges given seqFiles and unseqFiles into a new one, which basically consists of three + * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files + *2. move the merged chunks in the temp files back to the seqFiles or move the unmerged + *chunks in the seqFiles int temp files and replace the seqFiles with the temp files. + *3. remove unseqFiles + */ +public class MergeTask implements Callable { + + public static final String MERGE_SUFFIX = ".merge"; + private static final Logger logger = LoggerFactory.getLogger(MergeTask.class); + + MergeResource resource; + String storageGroupDir; + MergeLogger mergeLogger; + MergeContext mergeContext = new MergeContext(); + + private MergeCallback callback; + int concurrentMergeSeriesNum; + String taskName; + boolean fullMerge; + + MergeTask(List seqFiles, + List unseqFiles, String storageGroupDir, MergeCallback callback, + String taskName, boolean fullMerge) { +this.resource = new MergeResource(seqFiles, unseqFiles); +this.storageGroupDir = storageGroupDir; +this.callback = callback; +this.taskName = taskName; +this.fullMerge = fullMerge; +this.concurrentMergeSeriesNum = 1; + } + + public MergeTask(MergeResource mergeResource, String storageGroupDir, MergeCallback callback, + String taskName, boolean fullMerge, int concurrentMergeSeriesNum) { +this.resource = mergeResource; +this.storageGroupDir = storageGroupDir; +this.callback = callback; +this.taskName = taskName; +this.fullMerge = fullMerge; +this.concurrentMergeSeriesNum = concurrentMergeSeriesNum; + } + + @Override + public Void call() throws Exception { +try { + doMerge(); +} catch (Exception e) { + logger.error("Runtime exception in merge {}", taskName, e); + cleanUp(false); + // call the callback to make sure the StorageGroup exit merging status, but passing 2 + // empty file lists to avoid files being deleted. + callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME)); + throw e; +} +return null; + } + + private void doMerge() throws IOException { +if (logger.isInfoEnabled()) { + logger.info("{} starts to merge {} seqFiles, {} unseqFiles", taskName, + resource.getSeqFiles().size(), resource.getUnseqFiles().size()); +} +long startTime = System.currentTimeMillis(); +long totalFileSize = MergeUtils.collectFileSizes(resource.getSeqFiles(), +resource.getUnseqFiles()); +mergeLogger = new MergeLogger(storageGroupDir); + +mergeLogger.logFiles(resource); Review comment: need to make sure only one mergeTask works per storage group. otherwise there is concurrency problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313205759 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); Review comment: maybe FileReaderManager can help improve the performance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312739536 ## File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java ## @@ -229,6 +230,45 @@ */ private boolean chunkBufferPoolEnable = false; + /** + * How much memory (in byte) can be used by a single merge task. + */ + private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2); + + /** + * How many thread will be set up to perform merges. + */ + private int mergeThreadNum = 1; + + /** + * When set to true, if some crashed merges are detected during system rebooting, such merges will + * be continued, otherwise, the unfinished parts of such merges will not be continued while the + * finished parts still remain as they are. + */ + private boolean continueMergeAfterReboot = true; + + /** + * A global merge will be performed each such interval, that is, each storage group will be merged + * (if proper merge candidates can be found). Unit: second. + */ + private long mergeIntervalSec = 2 * 3600L; + + /** + * When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how + * much they are overflowed). This may increase merge overhead depending on how much the SeqFiles + * are overflowed. + */ + private boolean forceFullMerge = false; + + /** + * During a merge, if a chunk with less number of chunks than this parameter, the chunk will be + * merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach + * this threshold and the new chunk will be flushed. + */ + private int chunkMergePointThreshold = 512; + + private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.MAX_SERIES_NUM; Review comment: how do change it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313205735 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); Review comment: maybe FileReaderManager can help improve the performance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services