[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314601212 ## File path: server/src/assembly/resources/conf/iotdb-engine.properties ## @@ -155,7 +155,52 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merge main tasks, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How many thread will be set up to perform merge chunk sub-tasks, 8 by default. +# Set to 1 when less than or equal to 0. +merge_chunk_subthread_num=8 + +# If one merge file selection runs for more than this time, it will be ended and its current +# selection will be used as final selection. Unit: millis. +# When < 0, it means time is unbounded. +merge_fileSelection_time_budget=3 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false + +# A global merge will be performed each such interval, that is, each storage group will be merged +# (if proper merge candidates can be found). Unit: second, default: 1hours. +# When less than or equal to 0, timed merge is disabled. +merge_interval_sec=3600 + +# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how +# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles +# are overflowed. +force_full_merge=false + +# During a merge, if a chunk with less number of chunks than this parameter, the chunk will be +# merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach +# this threshold and the new chunk will be flushed. +# When less than 0, this mechanism is disabled. +chunk_merge_point_threshold=4096 Review comment: Any suggestion? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314601299 ## File path: server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetadataUtils.java ## @@ -95,6 +97,9 @@ public static TsDeviceMetadata getTsDeviceMetaData(String filePath, Path seriesP } } } +for (List chunkMetaDataList : pathToChunkMetaDataList.values()) { + chunkMetaDataList.sort(Comparator.comparingLong(ChunkMetaData::getStartTime)); +} Review comment: Yes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314601067 ## File path: server/src/assembly/resources/conf/iotdb-engine.properties ## @@ -155,7 +155,52 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merge main tasks, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How many thread will be set up to perform merge chunk sub-tasks, 8 by default. +# Set to 1 when less than or equal to 0. +merge_chunk_subthread_num=8 + +# If one merge file selection runs for more than this time, it will be ended and its current +# selection will be used as final selection. Unit: millis. +# When < 0, it means time is unbounded. +merge_fileSelection_time_budget=3 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false + +# A global merge will be performed each such interval, that is, each storage group will be merged +# (if proper merge candidates can be found). Unit: second, default: 1hours. +# When less than or equal to 0, timed merge is disabled. +merge_interval_sec=3600 + +# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how +# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles +# are overflowed. +force_full_merge=false + +# During a merge, if a chunk with less number of chunks than this parameter, the chunk will be Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314600996 ## File path: client/src/main/java/org/apache/iotdb/cli/client/AbstractClient.java ## @@ -728,12 +728,10 @@ protected static void importCmd(String specialCmd, String cmd, IoTDBConnection c } } - protected static void executeQuery(IoTDBConnection connection, String cmd) { -Statement statement = null; + private static void executeQuery(IoTDBConnection connection, String cmd) { long startTime = System.currentTimeMillis(); -try { +try (Statement statement = connection.createStatement();) { Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313297965 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerier.java ## @@ -62,4 +62,9 @@ */ List convertSpace2TimePartition(List paths, long spacePartitionStartPos, long spacePartitionEndPos) throws IOException; + + /** + * clear caches (if used) to release memory. + */ + void clear(); Review comment: rename this class to IMetadataQuerier? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313248755 ## File path: server/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java ## @@ -74,6 +74,30 @@ public Binary getBinary() { throw new UnsupportedOperationException("getBinary() is not supported for current sub-class"); } + public void setBoolean(boolean val) { +throw new UnsupportedOperationException("getBoolean() is not supported for current sub-class"); + } + + public void setInt(int val) { +throw new UnsupportedOperationException("getInt() is not supported for current sub-class"); Review comment: the same ... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313734754 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String me
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313248648 ## File path: server/src/main/java/org/apache/iotdb/db/utils/TsPrimitiveType.java ## @@ -74,6 +74,30 @@ public Binary getBinary() { throw new UnsupportedOperationException("getBinary() is not supported for current sub-class"); } + public void setBoolean(boolean val) { +throw new UnsupportedOperationException("getBoolean() is not supported for current sub-class"); Review comment: ```suggestion throw new UnsupportedOperationException("setBoolean() is not supported for current sub-class"); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314219509 ## File path: server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java ## @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.resourceRelated; + +import java.io.IOException; +import java.util.List; +import org.apache.iotdb.db.query.reader.chunkRelated.CachedDiskChunkReader; +import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; + +public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader { Review comment: ```suggestion public class CachedUnseqFilesMergeReader extends CachedPriorityMergeReader { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313685171 ## File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java ## @@ -185,53 +204,95 @@ public StorageGroupProcessor(String systemInfoDir, String storageGroupName) private void recover() throws ProcessorException { logger.info("recover Storage Group {}", storageGroupName); -// collect TsFiles from sequential data directory -List tsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders()); -recoverSeqFiles(tsFiles); - -// collect TsFiles from unsequential data directory -tsFiles = getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders()); -recoverUnseqFiles(tsFiles); +try { + // collect TsFiles from sequential and unsequential data directory + List seqTsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders()); + List unseqTsFiles = + getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders()); + + recoverSeqFiles(seqTsFiles); + recoverUnseqFiles(unseqTsFiles); + + String taskName = storageGroupName + "-" + System.currentTimeMillis(); + File mergingMods = new File(storageGroupSysDir, MERGING_MODIFICAITON_FILE_NAME); + if (mergingMods.exists()) { +mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME); Review comment: How about changing the type of parameter from String to FIle? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313716134 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java ## @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.merge.task.MergeTask; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.service.IService; +import org.apache.iotdb.db.service.ServiceType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeManager provides a ThreadPool to queue and run all merge tasks to restrain the total + * resources occupied by merge and manages a Timer to periodically issue a global merge. + */ +public class MergeManager implements IService { + + private static final Logger logger = LoggerFactory.getLogger(MergeManager.class); + private static final MergeManager INSTANCE = new MergeManager(); + + private AtomicInteger threadCnt = new AtomicInteger(); Review comment: Do we need to separate the merge thread and merge chunk sub-thread counter? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313352319 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +/** + * ForceAppendTsFileWriter opens a COMPLETE TsFile, reads and truncate its metadata to support + * appending new data. + */ +public class ForceAppendTsFileWriter extends TsFileIOWriter{ + + private Map knownSchemas = new HashMap<>(); Review comment: ```suggestion private Map knownSchemas; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313829348 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java ## @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.selector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.MergeException; +import org.apache.iotdb.db.utils.MergeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MaxFileMergeFileSelector selects the most files from given seqFiles and unseqFiles which can be + * merged without exceeding given memory budget. It always assume the number of timeseries being + * queried at the same time is 1 to maximize the number of file merged. + */ +public class MaxFileMergeFileSelector implements MergeFileSelector { + + private static final Logger logger = LoggerFactory.getLogger(MaxFileMergeFileSelector.class); + private static final String LOG_FILE_COST = "Memory cost of file {} is {}"; + + MergeResource resource; + + long totalCost; + private long memoryBudget; + private long maxSeqFileCost; + + // the number of timeseries being queried at the same time + int concurrentMergeNum = 1; + + /** + * Total metadata size of each file. + */ + private Map fileMetaSizeMap = new HashMap<>(); + /** + * Maximum memory cost of querying a timeseries in each file. + */ + private Map maxSeriesQueryCostMap = new HashMap<>(); + + List selectedUnseqFiles; + List selectedSeqFiles; + + private Collection tmpSelectedSeqFiles; + private long tempMaxSeqFileCost; + + private boolean[] seqSelected; + private int seqSelectedNum; + + public MaxFileMergeFileSelector(MergeResource resource, long memoryBudget) { +this.resource = resource; +this.memoryBudget = memoryBudget; + } + + /** + * Select merge candidates from seqFiles and unseqFiles under the given memoryBudget. + * This process iteratively adds the next unseqFile from unseqFiles and its overlapping seqFiles + * as newly-added candidates and computes their estimated memory cost. If the current cost + * pluses the new cost is still under the budget, accept the unseqFile and the seqFiles as + * candidates, otherwise go to the next iteration. + * The memory cost of a file is calculated in two ways: + *The rough estimation: for a seqFile, the size of its metadata is used for estimation. + *Since in the worst case, the file only contains one timeseries and all its metadata will + *be loaded into memory with at most one actual data page (which is negligible) and writing + *the timeseries into a new file generate metadata of the similar size, so the size of all + *seqFiles' metadata (generated when writing new chunks) pluses the largest one (loaded + *when reading a timeseries from the seqFiles) is the total estimation of all seqFiles; for + *an unseqFile, since the merge reader may read all chunks of a series to perform a merge + *read, the whole file may be loaded into memory, so we use the file's length as the + *maximum estimation. + *The tight estimation: based on the rough estimation, we scan the file's metadata to + *count the number of chunks for each series, find the series which have the most + *chunks in the file and use its chunk proportion to refine the rough estimation. + * The rough estimation is performed first, if no candidates can be found using rough + * estimation, we run the selection again with tight estimation. + * @return two lists of TsFileResource, the former is selected seqFiles and the latter is + * selected unseqFiles or an empty array if there are n
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313736890 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String me
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313823707 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); + + private Map fileReaderCache; + private Map fileWriterCache = new HashMap<>(); + private Map> modificationCache = new HashMap<>(); + private Map measurementSchemaMap = new HashMap<>(); + private Map chunkWriterCache = new ConcurrentHashMap<>(); + + private boolean cacheDeviceMeta = false; + + public MergeResource(List seqFiles, List unseqFiles) { +this.seqFiles = seqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.unseqFiles = + unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.fileReaderCache = new HashMap<>(); + } + + public void clear() throws IOException { +for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) { + sequenceReader.close(); +} + +fileReaderCache.clear(); +fileWriterCache.clear(); +modificationCache.clear(); +measurementSchemaMap.clear(); +chunkWriterCache.clear(); + } + + public MeasurementSchema getSchema(String measurementId) { +return measurementSchemaMap.get(measurementId); + } + + /** + * Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a + * SeqFile. The path of the merge temp file will be the seqFile's + ".merge". + * @param resource + * @return A RestorableTsFileIOWriter of a merge temp file for a SeqFile. + * @throws IOException + */ + public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException { +RestorableTsFileIOWriter writer = fileWriterCache.get(resource); +if (writer == null) { + writer = new RestorableTsFileIOWriter(new File(resource.getFile().getPath() + MERGE_SUFFIX)); + fileWriterCache.put(resource, writer); +} +return writer; + } + + /** + * Query ChunkMetadata of a timeseries from the given TsFile (seq or unseq). The ChunkMetadata + * is not cached sinc
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313687987 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java ## @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.recover; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.read.common.Path; + +/** + * MergeLogger records the progress of a merge in file "merge.log" as text lines. + */ +public class MergeLogger { + + public static final String MERGE_LOG_NAME = "merge.log"; + + static final String STR_SEQ_FILES = "seqFiles"; + static final String STR_UNSEQ_FILES = "unseqFiles"; + static final String STR_TIMESERIES = "timeseries"; + static final String STR_START = "start"; + static final String STR_END = "end"; + static final String STR_ALL_TS_END = "all ts end"; + static final String STR_MERGE_START = "merge start"; + static final String STR_MERGE_END = "merge end"; + + private BufferedWriter logStream; + + public MergeLogger(String storageGroupDir) throws IOException { +logStream = new BufferedWriter(new FileWriter(new File(storageGroupDir, MERGE_LOG_NAME), true)); + } + + public void close() throws IOException { +logStream.close(); + } + + public void logTSStart(List paths) throws IOException { +logStream.write(STR_START); +for (Path path : paths) { + logStream.write(" " + path.getFullPath()); +} +logStream.newLine(); +logStream.flush(); + } + + public void logFilePositionUpdate(File file) throws IOException { Review comment: ```suggestion public void logFilePosition(File file) throws IOException { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314576210 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java ## @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.recover; + +import static org.apache.iotdb.db.engine.merge.recover.MergeLogger.*; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.task.MergeTask; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.read.common.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * LogAnalyzer scans the "merge.log" file and recovers information such as files of last merge, + * the last available positions of each file and how many timeseries and files have been merged. + */ +public class LogAnalyzer { + + private static final Logger logger = LoggerFactory.getLogger(LogAnalyzer.class); + + private MergeResource resource; + private String taskName; + private File logFile; + + private Map fileLastPositions = new HashMap<>(); + private Map tempFileLastPositions = new HashMap<>(); + + private List mergedPaths = new ArrayList<>(); + private List unmergedPaths; + private List unmergedFiles; + private String currLine; + + private Status status; + + public LogAnalyzer(MergeResource resource, String taskName, File logFile) { +this.resource = resource; +this.taskName = taskName; +this.logFile = logFile; + } + + /** + * Scan through the logs to find out where the last merge has stopped and store the information + * about the progress in the fields. + * @return a Status indicating the completed stage of the last merge. + * @throws IOException + */ + public Status analyze() throws IOException { +status = Status.NONE; +try (BufferedReader bufferedReader = new BufferedReader(new FileReader(logFile))) { + currLine = bufferedReader.readLine(); + if (currLine != null) { +analyzeSeqFiles(bufferedReader); + +analyzeUnseqFiles(bufferedReader); + +analyzeUnmergedSeries(bufferedReader); + +analyzeMergedSeries(bufferedReader, unmergedPaths); + +analyzeMergedFiles(bufferedReader); + } +} +return status; + } + + private void analyzeUnmergedSeries(BufferedReader bufferedReader) throws IOException { +if (!STR_TIMESERIES.equals(currLine)) { + return; +} +long startTime = System.currentTimeMillis(); +List paths = new ArrayList<>(); +while ((currLine = bufferedReader.readLine()) != null) { + if (STR_MERGE_START.equals(currLine)) { +break; + } + paths.add(new Path(currLine)); +} +if (logger.isDebugEnabled()) { + logger.debug("{} found {} seq files after {}ms", taskName, paths.size(), + (System.currentTimeMillis() - startTime)); +} +unmergedPaths = paths; Review comment: allPathsToBeMerged? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313364622 ## File path: server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java ## @@ -34,7 +35,7 @@ import org.slf4j.LoggerFactory; /** - * FileReaderManager is a singleton, which is used to manage + * resource.getSeqFiles()ager is a singleton, which is used to manage Review comment: what is ager? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313370128 ## File path: server/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java ## @@ -91,13 +89,13 @@ void removeUsedFilesForGivenJob(long jobId) { * so sealedFilePathsMap.get(jobId) or unsealedFilePathsMap.get(jobId) * must not return null. */ - void addFilePathToMap(long jobId, String filePath, boolean isSealed) { -ConcurrentHashMap> pathMap = !isSealed ? unsealedFilePathsMap : + void addFilePathToMap(long jobId, TsFileResource tsFile, boolean isSealed) { Review comment: ```suggestion void addFilePathToMap(long jobId, TsFileResource tsFile, boolean isClosed) { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313197710 ## File path: server/src/assembly/resources/conf/iotdb-engine.properties ## @@ -155,7 +155,52 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merge main tasks, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How many thread will be set up to perform merge chunk sub-tasks, 8 by default. +# Set to 1 when less than or equal to 0. +merge_chunk_subthread_num=8 + +# If one merge file selection runs for more than this time, it will be ended and its current +# selection will be used as final selection. Unit: millis. +# When < 0, it means time is unbounded. +merge_fileSelection_time_budget=3 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false + +# A global merge will be performed each such interval, that is, each storage group will be merged +# (if proper merge candidates can be found). Unit: second, default: 1hours. +# When less than or equal to 0, timed merge is disabled. +merge_interval_sec=3600 + +# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how +# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles +# are overflowed. +force_full_merge=false + +# During a merge, if a chunk with less number of chunks than this parameter, the chunk will be Review comment: less number of points? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314206947 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); + + private Map fileReaderCache; + private Map fileWriterCache = new HashMap<>(); + private Map> modificationCache = new HashMap<>(); + private Map measurementSchemaMap = new HashMap<>(); + private Map chunkWriterCache = new ConcurrentHashMap<>(); + + private boolean cacheDeviceMeta = false; + + public MergeResource(List seqFiles, List unseqFiles) { +this.seqFiles = seqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.unseqFiles = + unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.fileReaderCache = new HashMap<>(); + } + + public void clear() throws IOException { Review comment: This method is not implemented as the name, it does not clear two resources This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313694327 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); + + private Map fileReaderCache; + private Map fileWriterCache = new HashMap<>(); + private Map> modificationCache = new HashMap<>(); + private Map measurementSchemaMap = new HashMap<>(); + private Map chunkWriterCache = new ConcurrentHashMap<>(); + + private boolean cacheDeviceMeta = false; + + public MergeResource(List seqFiles, List unseqFiles) { +this.seqFiles = seqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.unseqFiles = + unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.fileReaderCache = new HashMap<>(); Review comment: why leave this init alone... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313190255 ## File path: client/src/main/java/org/apache/iotdb/cli/client/AbstractClient.java ## @@ -728,12 +728,10 @@ protected static void importCmd(String specialCmd, String cmd, IoTDBConnection c } } - protected static void executeQuery(IoTDBConnection connection, String cmd) { -Statement statement = null; + private static void executeQuery(IoTDBConnection connection, String cmd) { long startTime = System.currentTimeMillis(); -try { +try (Statement statement = connection.createStatement();) { Review comment: ```suggestion try (Statement statement = connection.createStatement()) { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313704705 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); + + private Map fileReaderCache; + private Map fileWriterCache = new HashMap<>(); + private Map> modificationCache = new HashMap<>(); + private Map measurementSchemaMap = new HashMap<>(); + private Map chunkWriterCache = new ConcurrentHashMap<>(); + + private boolean cacheDeviceMeta = false; + + public MergeResource(List seqFiles, List unseqFiles) { +this.seqFiles = seqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.unseqFiles = + unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.fileReaderCache = new HashMap<>(); + } + + public void clear() throws IOException { +for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) { + sequenceReader.close(); +} + +fileReaderCache.clear(); +fileWriterCache.clear(); +modificationCache.clear(); +measurementSchemaMap.clear(); +chunkWriterCache.clear(); + } + + public MeasurementSchema getSchema(String measurementId) { +return measurementSchemaMap.get(measurementId); + } + + /** + * Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a + * SeqFile. The path of the merge temp file will be the seqFile's + ".merge". + * @param resource + * @return A RestorableTsFileIOWriter of a merge temp file for a SeqFile. + * @throws IOException + */ + public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException { +RestorableTsFileIOWriter writer = fileWriterCache.get(resource); +if (writer == null) { + writer = new RestorableTsFileIOWriter(new File(resource.getFile().getPath() + MERGE_SUFFIX)); + fileWriterCache.put(resource, writer); +} +return writer; + } + + /** + * Query ChunkMetadata of a timeseries from the given TsFile (seq or unseq). The ChunkMetadata + * is not cached sinc
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313352890 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +/** + * ForceAppendTsFileWriter opens a COMPLETE TsFile, reads and truncate its metadata to support + * appending new data. + */ +public class ForceAppendTsFileWriter extends TsFileIOWriter{ + + private Map knownSchemas = new HashMap<>(); + private long truncatePosition; + + public ForceAppendTsFileWriter(File file) throws IOException { +this.out = new DefaultTsFileOutput(file, true); +this.file = file; + +// file doesn't exist +if (file.length() == 0 || !file.exists()) { + throw new TsFileNotCompleteException("File " + file.getPath() + " is not a complete TsFile"); +} + +try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), true)) { + + // this tsfile is not complete + if (!reader.isComplete()) { +throw new TsFileNotCompleteException("File " + file.getPath() + " is not a complete TsFile"); + } + TsFileMetaData fileMetaData = reader.readFileMetadata(); + Map deviceMap = fileMetaData.getDeviceMap(); + long firstDeviceMetaPos = Long.MAX_VALUE; + for (Entry deviceMetadataEntry : deviceMap.entrySet()) { Review comment: ```suggestion for (TsDeviceMetadataIndex deviceMetadataIndex : deviceMap.values()) { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313197861 ## File path: server/src/assembly/resources/conf/iotdb-engine.properties ## @@ -155,7 +155,52 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merge main tasks, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How many thread will be set up to perform merge chunk sub-tasks, 8 by default. +# Set to 1 when less than or equal to 0. +merge_chunk_subthread_num=8 + +# If one merge file selection runs for more than this time, it will be ended and its current +# selection will be used as final selection. Unit: millis. +# When < 0, it means time is unbounded. +merge_fileSelection_time_budget=3 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false + +# A global merge will be performed each such interval, that is, each storage group will be merged +# (if proper merge candidates can be found). Unit: second, default: 1hours. +# When less than or equal to 0, timed merge is disabled. +merge_interval_sec=3600 + +# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how +# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles +# are overflowed. +force_full_merge=false + +# During a merge, if a chunk with less number of chunks than this parameter, the chunk will be +# merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach +# this threshold and the new chunk will be flushed. +# When less than 0, this mechanism is disabled. +chunk_merge_point_threshold=4096 Review comment: by default, this is the point number in one page, would it be better if we increase this threshold? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313774935 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MergePathSelector.java ## @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.selector; + +import java.util.Iterator; +import java.util.List; +import org.apache.iotdb.tsfile.read.common.Path; + +/** + * MergePathSelector select paths to be merged at a time if all paths of a device cannot be + * merged at the same time. + */ +public interface MergePathSelector extends Iterator> { Review comment: ```suggestion public interface IMergePathSelector extends Iterator> { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313704248 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); + + private Map fileReaderCache; + private Map fileWriterCache = new HashMap<>(); + private Map> modificationCache = new HashMap<>(); + private Map measurementSchemaMap = new HashMap<>(); + private Map chunkWriterCache = new ConcurrentHashMap<>(); + + private boolean cacheDeviceMeta = false; + + public MergeResource(List seqFiles, List unseqFiles) { +this.seqFiles = seqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.unseqFiles = + unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.fileReaderCache = new HashMap<>(); + } + + public void clear() throws IOException { +for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) { + sequenceReader.close(); +} + +fileReaderCache.clear(); +fileWriterCache.clear(); +modificationCache.clear(); +measurementSchemaMap.clear(); +chunkWriterCache.clear(); + } + + public MeasurementSchema getSchema(String measurementId) { +return measurementSchemaMap.get(measurementId); + } + + /** + * Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a + * SeqFile. The path of the merge temp file will be the seqFile's + ".merge". + * @param resource + * @return A RestorableTsFileIOWriter of a merge temp file for a SeqFile. + * @throws IOException + */ + public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException { +RestorableTsFileIOWriter writer = fileWriterCache.get(resource); +if (writer == null) { + writer = new RestorableTsFileIOWriter(new File(resource.getFile().getPath() + MERGE_SUFFIX)); + fileWriterCache.put(resource, writer); +} +return writer; + } + + /** + * Query ChunkMetadata of a timeseries from the given TsFile (seq or unseq). The ChunkMetadata + * is not cached sinc
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313774378 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/selector/FileQueryMemMeasurement.java ## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.selector; + +import java.io.IOException; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; + + +@FunctionalInterface +/** + * Estimate how much memory a file may occupy when being queried during merge. + * @param resource + * @return + * @throws IOException + */ +interface FileQueryMemMeasurement { Review comment: ```suggestion interface IFileQueryMemMeasurement { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313684907 ## File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java ## @@ -165,7 +184,7 @@ public StorageGroupProcessor(String systemInfoDir, String storageGroupName) this.fileSchema = constructFileSchema(storageGroupName); try { - File storageGroupSysDir = new File(systemInfoDir, storageGroupName); + storageGroupSysDir = new File(systemInfoDir, storageGroupName); if (storageGroupSysDir.mkdirs()) { logger.info("Storage Group system Directory {} doesn't exist, create it", storageGroupSysDir.getPath()); Review comment: in the line 192, there is a typo of "create" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313681792 ## File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java ## @@ -143,13 +158,17 @@ */ @SuppressWarnings("unused") // to be used in merge private ReentrantLock mergeDeleteLock = new ReentrantLock(); + private ReentrantReadWriteLock mergeLock = new ReentrantReadWriteLock(); Review comment: add javadoc? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313201751 ## File path: server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetadataUtils.java ## @@ -95,6 +97,9 @@ public static TsDeviceMetadata getTsDeviceMetaData(String filePath, Path seriesP } } } +for (List chunkMetaDataList : pathToChunkMetaDataList.values()) { + chunkMetaDataList.sort(Comparator.comparingLong(ChunkMetaData::getStartTime)); +} Review comment: is it because of merge that makes them out-of-order? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313736246 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String me
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313208276 ## File path: server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java ## @@ -120,6 +121,17 @@ public UnseqResourceMergeReader(Path seriesPath, List unseqResou } } + public UnseqResourceMergeReader(Path seriesPath, List chunks, Filter filter) Review comment: this method is unused, remove? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313774688 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MergeFileSelector.java ## @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.selector; + +import java.util.List; +import org.apache.iotdb.db.exception.MergeException; + +/** + * MergeFileSelector selects a set of files from given seqFiles and unseqFiles which can be + * merged without exceeding given memory budget. + */ +public interface MergeFileSelector { Review comment: ```suggestion public interface IMergeFileSelector { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314209390 ## File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java ## @@ -674,6 +739,136 @@ public void closeUnsealedTsFileProcessor( } } + public void merge(boolean fullMerge) { +writeLock(); +try { + if (isMerging) { +if (logger.isInfoEnabled()) { + logger.info("{} Last merge is ongoing, currently consumed time: {}ms", storageGroupName, + (System.currentTimeMillis() - mergeStartTime)); +} +return; + } + if (unSequenceFileList.isEmpty() || sequenceFileList.isEmpty()) { +logger.info("{} no files to be merged", storageGroupName); +return; + } + + long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget(); + MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList); + MergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource); + try { +List[] mergeFiles = fileSelector.select(); +if (mergeFiles.length == 0) { + logger.info("{} cannot select merge candidates under the budget {}", storageGroupName, + budget); + return; +} +// avoid pending tasks holds the metadata and streams +mergeResource.clear(); +String taskName = storageGroupName + "-" + System.currentTimeMillis(); +// do not cache metadata until true candidates are chosen, or too much metadata will be +// cached during selection +mergeResource.setCacheDeviceMeta(true); + +MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(), +this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum()); +mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME); +MergeManager.getINSTANCE().submitMainTask(mergeTask); +if (logger.isInfoEnabled()) { + logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles", + storageGroupName, taskName, mergeFiles[0].size(), mergeFiles[1].size()); +} +isMerging = true; +mergeStartTime = System.currentTimeMillis(); + + } catch (MergeException | IOException e) { +logger.error("{} cannot select file for merge", storageGroupName, e); + } +} finally { + writeUnlock(); +} + } + + private MergeFileSelector getMergeFileSelector(long budget, MergeResource resource) { +MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy(); +switch (strategy) { + case MAX_FILE_NUM: +return new MaxFileMergeFileSelector(resource, budget); + case MAX_SERIES_NUM: +return new MaxSeriesMergeFileSelector(resource, budget); + default: +throw new UnsupportedOperationException("Unknown MergeFileStrategy " + strategy); +} + } + + protected void mergeEndAction(List seqFiles, List unseqFiles, Review comment: add javadoc This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313262501 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java ## @@ -28,11 +28,12 @@ private ChunkHeader chunkHeader; private ByteBuffer chunkData; - private long deletedAt = -1; + private long deletedAt; - public Chunk(ChunkHeader header, ByteBuffer buffer) { + public Chunk(ChunkHeader header, ByteBuffer buffer, long deletedAt) { this.chunkHeader = header; this.chunkData = buffer; +this.deletedAt = deletedAt; Review comment: add a comment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313820437 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java ## @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.selector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.MergeException; +import org.apache.iotdb.db.utils.MergeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MaxFileMergeFileSelector selects the most files from given seqFiles and unseqFiles which can be + * merged without exceeding given memory budget. It always assume the number of timeseries being + * queried at the same time is 1 to maximize the number of file merged. + */ +public class MaxFileMergeFileSelector implements MergeFileSelector { + + private static final Logger logger = LoggerFactory.getLogger(MaxFileMergeFileSelector.class); + private static final String LOG_FILE_COST = "Memory cost of file {} is {}"; + + MergeResource resource; + + long totalCost; + private long memoryBudget; + private long maxSeqFileCost; + + // the number of timeseries being queried at the same time + int concurrentMergeNum = 1; + + /** + * Total metadata size of each file. + */ + private Map fileMetaSizeMap = new HashMap<>(); + /** + * Maximum memory cost of querying a timeseries in each file. + */ + private Map maxSeriesQueryCostMap = new HashMap<>(); + + List selectedUnseqFiles; + List selectedSeqFiles; + + private Collection tmpSelectedSeqFiles; + private long tempMaxSeqFileCost; + + private boolean[] seqSelected; + private int seqSelectedNum; + + public MaxFileMergeFileSelector(MergeResource resource, long memoryBudget) { +this.resource = resource; +this.memoryBudget = memoryBudget; + } + + /** + * Select merge candidates from seqFiles and unseqFiles under the given memoryBudget. + * This process iteratively adds the next unseqFile from unseqFiles and its overlapping seqFiles + * as newly-added candidates and computes their estimated memory cost. If the current cost + * pluses the new cost is still under the budget, accept the unseqFile and the seqFiles as + * candidates, otherwise go to the next iteration. + * The memory cost of a file is calculated in two ways: + *The rough estimation: for a seqFile, the size of its metadata is used for estimation. + *Since in the worst case, the file only contains one timeseries and all its metadata will + *be loaded into memory with at most one actual data page (which is negligible) and writing Review comment: I think we read at least a chunk at a time This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313346240 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java ## @@ -612,4 +646,38 @@ public long selfCheck(Map newSchema, // so that we can continue to write data into this tsfile. return truncatedPosition; } + + public int getTotalChunkNum() { +return totalChunkNum; + } + + public List getChunkMetadata(Path path) throws IOException { Review comment: ```suggestion public List getChunkMetadataList(Path path) throws IOException { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314575470 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java ## @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.recover; + +import static org.apache.iotdb.db.engine.merge.recover.MergeLogger.*; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.task.MergeTask; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.read.common.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * LogAnalyzer scans the "merge.log" file and recovers information such as files of last merge, + * the last available positions of each file and how many timeseries and files have been merged. + */ +public class LogAnalyzer { + + private static final Logger logger = LoggerFactory.getLogger(LogAnalyzer.class); + + private MergeResource resource; + private String taskName; + private File logFile; + + private Map fileLastPositions = new HashMap<>(); + private Map tempFileLastPositions = new HashMap<>(); + + private List mergedPaths = new ArrayList<>(); + private List unmergedPaths; + private List unmergedFiles; + private String currLine; + + private Status status; + + public LogAnalyzer(MergeResource resource, String taskName, File logFile) { +this.resource = resource; +this.taskName = taskName; +this.logFile = logFile; + } + + /** + * Scan through the logs to find out where the last merge has stopped and store the information + * about the progress in the fields. + * @return a Status indicating the completed stage of the last merge. + * @throws IOException + */ + public Status analyze() throws IOException { +status = Status.NONE; +try (BufferedReader bufferedReader = new BufferedReader(new FileReader(logFile))) { + currLine = bufferedReader.readLine(); + if (currLine != null) { +analyzeSeqFiles(bufferedReader); + +analyzeUnseqFiles(bufferedReader); + +analyzeUnmergedSeries(bufferedReader); + +analyzeMergedSeries(bufferedReader, unmergedPaths); + +analyzeMergedFiles(bufferedReader); + } +} +return status; + } + + private void analyzeUnmergedSeries(BufferedReader bufferedReader) throws IOException { +if (!STR_TIMESERIES.equals(currLine)) { + return; +} +long startTime = System.currentTimeMillis(); +List paths = new ArrayList<>(); +while ((currLine = bufferedReader.readLine()) != null) { + if (STR_MERGE_START.equals(currLine)) { +break; + } + paths.add(new Path(currLine)); +} +if (logger.isDebugEnabled()) { + logger.debug("{} found {} seq files after {}ms", taskName, paths.size(), + (System.currentTimeMillis() - startTime)); +} +unmergedPaths = paths; + } + + private void analyzeSeqFiles(BufferedReader bufferedReader) throws IOException { +if (!STR_SEQ_FILES.equals(currLine)) { + return; +} +long startTime = System.currentTimeMillis(); +List mergeSeqFiles = new ArrayList<>(); +while ((currLine = bufferedReader.readLine()) != null) { + if (STR_UNSEQ_FILES.equals(currLine)) { +break; + } + Iterator iterator = resource.getSeqFiles().iterator(); + while (iterator.hasNext()) { +TsFileResource seqFile = iterator.next(); +if (seqFile.getFile().getAbsolutePath().equals(currLine)) { Review comment: could this resource list be different from the log file? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313852348 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java ## @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.selector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.MergeException; +import org.apache.iotdb.db.utils.MergeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MaxFileMergeFileSelector selects the most files from given seqFiles and unseqFiles which can be + * merged without exceeding given memory budget. It always assume the number of timeseries being + * queried at the same time is 1 to maximize the number of file merged. + */ +public class MaxFileMergeFileSelector implements MergeFileSelector { + + private static final Logger logger = LoggerFactory.getLogger(MaxFileMergeFileSelector.class); + private static final String LOG_FILE_COST = "Memory cost of file {} is {}"; + + MergeResource resource; + + long totalCost; + private long memoryBudget; + private long maxSeqFileCost; + + // the number of timeseries being queried at the same time + int concurrentMergeNum = 1; + + /** + * Total metadata size of each file. + */ + private Map fileMetaSizeMap = new HashMap<>(); + /** + * Maximum memory cost of querying a timeseries in each file. + */ + private Map maxSeriesQueryCostMap = new HashMap<>(); + + List selectedUnseqFiles; + List selectedSeqFiles; + + private Collection tmpSelectedSeqFiles; + private long tempMaxSeqFileCost; + + private boolean[] seqSelected; + private int seqSelectedNum; + + public MaxFileMergeFileSelector(MergeResource resource, long memoryBudget) { +this.resource = resource; +this.memoryBudget = memoryBudget; + } + + /** + * Select merge candidates from seqFiles and unseqFiles under the given memoryBudget. + * This process iteratively adds the next unseqFile from unseqFiles and its overlapping seqFiles + * as newly-added candidates and computes their estimated memory cost. If the current cost + * pluses the new cost is still under the budget, accept the unseqFile and the seqFiles as + * candidates, otherwise go to the next iteration. + * The memory cost of a file is calculated in two ways: + *The rough estimation: for a seqFile, the size of its metadata is used for estimation. + *Since in the worst case, the file only contains one timeseries and all its metadata will + *be loaded into memory with at most one actual data page (which is negligible) and writing + *the timeseries into a new file generate metadata of the similar size, so the size of all + *seqFiles' metadata (generated when writing new chunks) pluses the largest one (loaded + *when reading a timeseries from the seqFiles) is the total estimation of all seqFiles; for + *an unseqFile, since the merge reader may read all chunks of a series to perform a merge + *read, the whole file may be loaded into memory, so we use the file's length as the + *maximum estimation. + *The tight estimation: based on the rough estimation, we scan the file's metadata to + *count the number of chunks for each series, find the series which have the most + *chunks in the file and use its chunk proportion to refine the rough estimation. + * The rough estimation is performed first, if no candidates can be found using rough + * estimation, we run the selection again with tight estimation. + * @return two lists of TsFileResource, the former is selected seqFiles and the latter is + * selected unseqFiles or an empty array if there are n
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313693421 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java ## @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.recover; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.read.common.Path; + +/** + * MergeLogger records the progress of a merge in file "merge.log" as text lines. + */ +public class MergeLogger { Review comment: It's better to give an example of merge.log. Suppose there are 3 sequence files and 2 unsequence files including 2 timeseries, what will the merge.log looks like at the end of the merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313691947 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java ## @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.recover; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.read.common.Path; + +/** + * MergeLogger records the progress of a merge in file "merge.log" as text lines. + */ +public class MergeLogger { + + public static final String MERGE_LOG_NAME = "merge.log"; + + static final String STR_SEQ_FILES = "seqFiles"; + static final String STR_UNSEQ_FILES = "unseqFiles"; + static final String STR_TIMESERIES = "timeseries"; + static final String STR_START = "start"; + static final String STR_END = "end"; Review comment: ```suggestion static final String STR_TS_END = "ts end"; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313686847 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java ## @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.task; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer; +import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer.Status; +import org.apache.iotdb.db.engine.merge.recover.MergeLogger; +import org.apache.iotdb.db.engine.merge.selector.MaxSeriesMergeFileSelector; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * RecoverMergeTask is an extension of MergeTask, which resumes the last merge progress by + * scanning merge.log using LogAnalyzer and continue the unfinished merge. + */ +public class RecoverMergeTask extends MergeTask { + + private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class); + + private LogAnalyzer analyzer; + + public RecoverMergeTask(List seqFiles, + List unseqFiles, String storageGroupDir, Review comment: ```suggestion List unseqFiles, String storageGroupSysDir, ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313204795 ## File path: server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java ## @@ -109,52 +109,6 @@ public long assignJobId() { return jobId; } - /** Review comment: the JavaDoc of this class needs to be cleaned This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313716591 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java ## @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.merge.task.MergeTask; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.service.IService; +import org.apache.iotdb.db.service.ServiceType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeManager provides a ThreadPool to queue and run all merge tasks to restrain the total + * resources occupied by merge and manages a Timer to periodically issue a global merge. + */ +public class MergeManager implements IService { + + private static final Logger logger = LoggerFactory.getLogger(MergeManager.class); + private static final MergeManager INSTANCE = new MergeManager(); + + private AtomicInteger threadCnt = new AtomicInteger(); + private ThreadPoolExecutor mergeTaskPool; + private ThreadPoolExecutor mergeChunkSubTaskPool; + private ScheduledExecutorService timedMergeThreadPool; + + private MergeManager() { + } + + public static MergeManager getINSTANCE() { +return INSTANCE; + } + + public void submitMainTask(MergeTask mergeTask) { +mergeTaskPool.submit(mergeTask); + } + + public Future submitChunkSubTask(Callable callable) { +return mergeChunkSubTaskPool.submit(callable); + } + + @Override + public void start() { +if (mergeTaskPool == null) { + int threadNum = IoTDBDescriptor.getInstance().getConfig().getMergeThreadNum(); + if (threadNum <= 0) { +threadNum = 1; + } + + int chunkSubThreadNum = IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum(); + if (chunkSubThreadNum <= 0) { +chunkSubThreadNum = 1; + } + + mergeTaskPool = + (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum, + r -> new Thread(r, "MergeThread-" + threadCnt.getAndIncrement())); + mergeChunkSubTaskPool = + (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum * chunkSubThreadNum, + r -> new Thread(r, "MergeChunkSubThread-" + threadCnt.getAndIncrement())); + long mergeInterval = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec(); + if (mergeInterval > 0) { +timedMergeThreadPool = Executors.newSingleThreadScheduledExecutor( r -> new Thread(r, +"TimedMergeThread")); +timedMergeThreadPool.scheduleAtFixedRate(this::flushAll, mergeInterval, +mergeInterval, TimeUnit.SECONDS); + } + logger.info("MergeManager started"); +} + } + + @Override + public void stop() { +if (mergeTaskPool != null) { + if (timedMergeThreadPool != null) { +timedMergeThreadPool.shutdownNow(); +timedMergeThreadPool = null; + } + mergeTaskPool.shutdownNow(); + mergeChunkSubTaskPool.shutdownNow(); + logger.info("Waiting for task pool to shut down"); + while (!mergeTaskPool.isTerminated() || !mergeChunkSubTaskPool.isTerminated() ) { +// wait + } + mergeTaskPool = null; + logger.info("MergeManager stopped"); +} + } + + @Override + public ServiceType getID() { +return ServiceType.MERGE_SERVICE; + } + + private void flushAll() { Review comment: ```suggestion private void mergeAll() { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313735585 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String me
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313353051 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.write.writer; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +/** + * ForceAppendTsFileWriter opens a COMPLETE TsFile, reads and truncate its metadata to support + * appending new data. + */ +public class ForceAppendTsFileWriter extends TsFileIOWriter{ + + private Map knownSchemas = new HashMap<>(); + private long truncatePosition; + + public ForceAppendTsFileWriter(File file) throws IOException { +this.out = new DefaultTsFileOutput(file, true); +this.file = file; + +// file doesn't exist +if (file.length() == 0 || !file.exists()) { + throw new TsFileNotCompleteException("File " + file.getPath() + " is not a complete TsFile"); +} + +try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), true)) { + + // this tsfile is not complete + if (!reader.isComplete()) { +throw new TsFileNotCompleteException("File " + file.getPath() + " is not a complete TsFile"); + } + TsFileMetaData fileMetaData = reader.readFileMetadata(); + Map deviceMap = fileMetaData.getDeviceMap(); + long firstDeviceMetaPos = Long.MAX_VALUE; + for (Entry deviceMetadataEntry : deviceMap.entrySet()) { +TsDeviceMetadataIndex deviceMetadataIndex = deviceMetadataEntry.getValue(); Review comment: ```suggestion ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313694613 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); + + private Map fileReaderCache; + private Map fileWriterCache = new HashMap<>(); + private Map> modificationCache = new HashMap<>(); + private Map measurementSchemaMap = new HashMap<>(); + private Map chunkWriterCache = new ConcurrentHashMap<>(); + + private boolean cacheDeviceMeta = false; + + public MergeResource(List seqFiles, List unseqFiles) { +this.seqFiles = seqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.unseqFiles = + unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList()); +this.fileReaderCache = new HashMap<>(); + } + + public void clear() throws IOException { +for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) { + sequenceReader.close(); +} + +fileReaderCache.clear(); +fileWriterCache.clear(); Review comment: Do RestorableTsFileIOWriters in fileWriterCache need to be closed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314202063 ## File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java ## @@ -185,53 +204,95 @@ public StorageGroupProcessor(String systemInfoDir, String storageGroupName) private void recover() throws ProcessorException { logger.info("recover Storage Group {}", storageGroupName); -// collect TsFiles from sequential data directory -List tsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders()); -recoverSeqFiles(tsFiles); - -// collect TsFiles from unsequential data directory -tsFiles = getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders()); -recoverUnseqFiles(tsFiles); +try { + // collect TsFiles from sequential and unsequential data directory + List seqTsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders()); + List unseqTsFiles = + getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders()); + + recoverSeqFiles(seqTsFiles); + recoverUnseqFiles(unseqTsFiles); + + String taskName = storageGroupName + "-" + System.currentTimeMillis(); + File mergingMods = new File(storageGroupSysDir, MERGING_MODIFICAITON_FILE_NAME); + if (mergingMods.exists()) { +mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME); + } + RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles, + storageGroupSysDir.getPath(), this::mergeEndAction, taskName, + IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); + logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName); + recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()); + if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) { +mergingMods.delete(); + } +} catch (IOException e) { + throw new ProcessorException(e); +} for (TsFileResource resource : sequenceFileList) { latestTimeForEachDevice.putAll(resource.getEndTimeMap()); latestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap()); } } - private List getAllFiles(List folders) { + private List getAllFiles(List folders) throws IOException { List tsFiles = new ArrayList<>(); for (String baseDir : folders) { File fileFolder = new File(baseDir, storageGroupName); if (!fileFolder.exists()) { continue; } + // some TsFileResource may be persisting when the system crashed, try recovering such Review comment: ```suggestion // some TsFileResource may be being persisted when the system crashed, try recovering such ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313298216 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoader.java ## @@ -34,4 +34,8 @@ */ void close() throws IOException; + /** + * clear Chunk cache if used. + */ + void clear(); Review comment: rename this class to IChunkLoader? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313705442 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); Review comment: this is unused now, remove ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314208495 ## File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java ## @@ -674,6 +739,136 @@ public void closeUnsealedTsFileProcessor( } } + public void merge(boolean fullMerge) { +writeLock(); +try { + if (isMerging) { +if (logger.isInfoEnabled()) { + logger.info("{} Last merge is ongoing, currently consumed time: {}ms", storageGroupName, + (System.currentTimeMillis() - mergeStartTime)); +} +return; + } + if (unSequenceFileList.isEmpty() || sequenceFileList.isEmpty()) { +logger.info("{} no files to be merged", storageGroupName); +return; + } + + long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget(); + MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList); + MergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource); + try { +List[] mergeFiles = fileSelector.select(); +if (mergeFiles.length == 0) { + logger.info("{} cannot select merge candidates under the budget {}", storageGroupName, + budget); + return; +} +// avoid pending tasks holds the metadata and streams +mergeResource.clear(); +String taskName = storageGroupName + "-" + System.currentTimeMillis(); +// do not cache metadata until true candidates are chosen, or too much metadata will be +// cached during selection +mergeResource.setCacheDeviceMeta(true); + +MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(), +this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum()); +mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME); +MergeManager.getINSTANCE().submitMainTask(mergeTask); +if (logger.isInfoEnabled()) { + logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles", + storageGroupName, taskName, mergeFiles[0].size(), mergeFiles[1].size()); +} +isMerging = true; +mergeStartTime = System.currentTimeMillis(); + + } catch (MergeException | IOException e) { +logger.error("{} cannot select file for merge", storageGroupName, e); + } +} finally { + writeUnlock(); +} + } + + private MergeFileSelector getMergeFileSelector(long budget, MergeResource resource) { +MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy(); +switch (strategy) { + case MAX_FILE_NUM: +return new MaxFileMergeFileSelector(resource, budget); + case MAX_SERIES_NUM: +return new MaxSeriesMergeFileSelector(resource, budget); + default: +throw new UnsupportedOperationException("Unknown MergeFileStrategy " + strategy); +} + } + + protected void mergeEndAction(List seqFiles, List unseqFiles, + File mergeLog) { +logger.info("{} a merge task is ending...", storageGroupName); + +if (unseqFiles.isEmpty()) { + // merge runtime exception arose, just end this merge + isMerging = false; + logger.info("{} a merge task abnormally ends", storageGroupName); + return; +} + +mergeLock.writeLock().lock(); +try { + unSequenceFileList.removeAll(unseqFiles); +} finally { + mergeLock.writeLock().unlock(); +} + +for (int i = 0; i < unseqFiles.size(); i++) { + TsFileResource unseqFile = unseqFiles.get(i); + unseqFile.getMergeQueryLock().writeLock().lock(); + try { +unseqFile.remove(); +unseqFiles.remove(unseqFile); Review comment: is this needed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge
qiaojialin commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313691907 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java ## @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.recover; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.read.common.Path; + +/** + * MergeLogger records the progress of a merge in file "merge.log" as text lines. + */ +public class MergeLogger { + + public static final String MERGE_LOG_NAME = "merge.log"; + + static final String STR_SEQ_FILES = "seqFiles"; + static final String STR_UNSEQ_FILES = "unseqFiles"; + static final String STR_TIMESERIES = "timeseries"; + static final String STR_START = "start"; Review comment: ```suggestion static final String STR_TS_START = "ts start"; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] little-emotion opened a new pull request #341: increase configuration parameters of cache
little-emotion opened a new pull request #341: increase configuration parameters of cache URL: https://github.com/apache/incubator-iotdb/pull/341 Adding configuration parameters to metadata caching module: 1、Is enable cache parameter. 2、Memory occupancy ratio parameters of tsFileMetaData cache and chunkMetaData cache. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314578681 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Path.java ## @@ -185,7 +185,7 @@ public int hashCode() { @Override public boolean equals(Object obj) { -return obj != null && obj instanceof Path && this.fullPath.equals(((Path) obj).fullPath); +return obj instanceof Path && this.fullPath.equals(((Path) obj).fullPath); Review comment: instanceof always returns false for null. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314578609 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java ## @@ -108,6 +114,8 @@ public static TsFileMetaData deserializeFrom(InputStream inputStream) throws IOE if (ReadWriteIOUtils.readIsNull(inputStream)) { fileMetaData.createdBy = ReadWriteIOUtils.readString(inputStream); } +fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(inputStream); Review comment: We shall discuss about compatibility somewhere else. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314578352 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java ## @@ -151,6 +159,8 @@ public static TsFileMetaData deserializeFrom(ByteBuffer buffer) throws IOExcepti if (ReadWriteIOUtils.readIsNull(buffer)) { fileMetaData.createdBy = ReadWriteIOUtils.readString(buffer); } +fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer); Review comment: I think keeping compatibility needs a much more delicate mechanism than this, which is beyond this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314578352 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java ## @@ -151,6 +159,8 @@ public static TsFileMetaData deserializeFrom(ByteBuffer buffer) throws IOExcepti if (ReadWriteIOUtils.readIsNull(buffer)) { fileMetaData.createdBy = ReadWriteIOUtils.readString(buffer); } +fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer); Review comment: I think keeping compatibility needs a much more delicate mechanism than this. I think it is beyond this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314578160 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java ## @@ -135,9 +135,9 @@ */ public static int pageCheckSizeThreshold = 100; /** - * Default endian value is LITTLE_ENDIAN. + * Default endian value is BIG_ENDIAN. Review comment: put methods in ByteBuffer by default uses BIG_ENDIAN, so using BIG_ENDIAN as default may reduce a lot of meaning less lines. For the compatibility, no, but somehow you may implement BIG_ENDIAN(which we should have) in 0.8.0 and change the config. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314577395 ## File path: server/src/main/java/org/apache/iotdb/db/utils/TimeValuePairUtils.java ## @@ -64,17 +74,93 @@ public static TimeValuePair getCurrentTimeValuePair(AggreResultData data) { case INT32: return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsInt(data.getIntRet())); case INT64: -return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsLong(data.getLongRet())); +return new TimeValuePair(data.getTimestamp(), +new TsPrimitiveType.TsLong(data.getLongRet())); case FLOAT: -return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsFloat(data.getFloatRet())); +return new TimeValuePair(data.getTimestamp(), +new TsPrimitiveType.TsFloat(data.getFloatRet())); case DOUBLE: -return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsDouble(data.getDoubleRet())); +return new TimeValuePair(data.getTimestamp(), +new TsPrimitiveType.TsDouble(data.getDoubleRet())); case TEXT: -return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsBinary(data.getBinaryRet())); +return new TimeValuePair(data.getTimestamp(), +new TsPrimitiveType.TsBinary(data.getBinaryRet())); case BOOLEAN: -return new TimeValuePair(data.getTimestamp(), new TsPrimitiveType.TsBoolean(data.isBooleanRet())); +return new TimeValuePair(data.getTimestamp(), +new TsPrimitiveType.TsBoolean(data.isBooleanRet())); default: throw new UnSupportedDataTypeException(String.valueOf(data.getDataType())); } } -} + + public static void setCurrentTimeValuePair(BatchData data, TimeValuePair current) { +current.setTimestamp(data.currentTime()); +switch (data.getDataType()) { + case INT32: +current.getValue().setInt(data.getInt()); +break; + case INT64: +current.getValue().setLong(data.getLong()); +break; + case FLOAT: +current.getValue().setFloat(data.getFloat()); +break; + case DOUBLE: +current.getValue().setDouble(data.getDouble()); +break; + case TEXT: +current.getValue().setBinary(data.getBinary()); +break; + case BOOLEAN: +current.getValue().setBoolean(data.getBoolean()); +break; + default: +throw new UnSupportedDataTypeException(String.valueOf(data.getDataType())); +} + } + + public static void setTimeValuePair(TimeValuePair from, TimeValuePair to) { +to.setTimestamp(from.getTimestamp()); +switch (from.getValue().getDataType()) { + case INT32: +to.getValue().setInt(from.getValue().getInt()); +break; + case INT64: +to.getValue().setLong(from.getValue().getLong()); +break; + case FLOAT: +to.getValue().setFloat(from.getValue().getFloat()); +break; + case DOUBLE: +to.getValue().setDouble(from.getValue().getDouble()); +break; + case TEXT: +to.getValue().setBinary(from.getValue().getBinary()); +break; + case BOOLEAN: +to.getValue().setBoolean(from.getValue().getBoolean()); +break; + default: +throw new UnSupportedDataTypeException(String.valueOf(from.getValue().getDataType())); +} + } + + public static TimeValuePair getEmptyTimeValuePair(TSDataType dataType) { +switch (dataType) { + case FLOAT: +return new TimeValuePair(0, new TsFloat(0.0f)); Review comment: Where? This is not a method like Collections.emptyMap() at all. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314577226 ## File path: server/src/main/java/org/apache/iotdb/db/service/ServiceType.java ## @@ -31,6 +31,7 @@ AUTHORIZATION_SERVICE("Authorization ServerService", ""), FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""), SYNC_SERVICE("SYNC ServerService", ""), + MERGE_SERVICE("Merge Manager", ""), Review comment: I hope to leave it as a future work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314577062 ## File path: server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java ## @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.chunkRelated; + +import java.io.IOException; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; + +public class CachedDiskChunkReader implements IPointReader { Review comment: I cannot. The next() in DiskChunkReader does not throw IOException while the one in CachedDiskChunkReader does. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314576748 ## File path: server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java ## @@ -109,52 +109,6 @@ public long assignJobId() { return jobId; } - /** - * Begin query and set query tokens of queryPaths. This method is used for projection - * calculation. - */ - public void beginQueryOfGivenQueryPaths(long jobId, List queryPaths) Review comment: Updated. Tokens were used to prevent a merge deleting files being queried, which are replaced by the locks in TsFileResource. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314574919 ## File path: server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java ## @@ -34,7 +35,7 @@ import org.slf4j.LoggerFactory; /** - * FileReaderManager is a singleton, which is used to manage + * resource.getSeqFiles()ager is a singleton, which is used to manage Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314574804 ## File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java ## @@ -185,53 +204,95 @@ public StorageGroupProcessor(String systemInfoDir, String storageGroupName) private void recover() throws ProcessorException { logger.info("recover Storage Group {}", storageGroupName); -// collect TsFiles from sequential data directory -List tsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders()); -recoverSeqFiles(tsFiles); - -// collect TsFiles from unsequential data directory -tsFiles = getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders()); -recoverUnseqFiles(tsFiles); +try { + // collect TsFiles from sequential and unsequential data directory + List seqTsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders()); + List unseqTsFiles = + getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders()); + + recoverSeqFiles(seqTsFiles); + recoverUnseqFiles(unseqTsFiles); + + String taskName = storageGroupName + "-" + System.currentTimeMillis(); + File mergingMods = new File(storageGroupSysDir, MERGING_MODIFICAITON_FILE_NAME); + if (mergingMods.exists()) { +mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME); + } + RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles, Review comment: This process contains the repairs of merged files, it must be done before the database is online. You can perform the repair parts only by setting the config continue_merge_after_reboot to false, but it cannot be asynchronized. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314574478 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java ## @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.task; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache; +import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache; +import org.apache.iotdb.db.engine.merge.manage.MergeContext; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.recover.MergeLogger; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.schema.FileSchema; +import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeFileTask merges the merge temporary files with the seqFiles, either move the merged + * chunks in the temp files into the seqFiles or move the unmerged chunks into the merge temp + * files, depending on which one is the majority. + */ +class MergeFileTask { + + private static final Logger logger = LoggerFactory.getLogger(MergeFileTask.class); + + private String taskName; + private MergeContext context; + private MergeLogger mergeLogger; + private MergeResource resource; + private List unmergedFiles; + + MergeFileTask(String taskName, MergeContext context, MergeLogger mergeLogger, + MergeResource resource, List unmergedSeqFiles) { +this.taskName = taskName; +this.context = context; +this.mergeLogger = mergeLogger; +this.resource = resource; +this.unmergedFiles = unmergedSeqFiles; + } + + void mergeFiles() throws IOException { +// decide whether to write the unmerged chunks to the merge files or to move the merged chunks +// back to the origin seqFile's +if (logger.isInfoEnabled()) { + logger.info("{} starts to merge {} files", taskName, unmergedFiles.size()); +} +long startTime = System.currentTimeMillis(); +int cnt = 0; +for (TsFileResource seqFile : unmergedFiles) { + int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0); + int unmergedChunkNum = context.getUnmergedChunkCnt().getOrDefault(seqFile, 0); + if (mergedChunkNum >= unmergedChunkNum) { +// move the unmerged data to the new file +if (logger.isInfoEnabled()) { + logger.info("{} moving unmerged data of {} to the merged file, {} merged chunks, {} " + + "unmerged chunks", taskName, seqFile.getFile().getName(), mergedChunkNum, unmergedChunkNum); +} +moveUnmergedToNew(seqFile); + } else { +// move the merged data to the old file +if (logger.isInfoEnabled()) { + logger.info("{} moving merged data of {} to the old file {} merged chunks, {} " + + "unmerged chunks", taskName, seqFile.getFile().getName(), mergedChunkNum, unmergedChunkNum); +} +moveMergedToOld(seqFile); + } + cnt ++; + if (logger.isInfoEnabled()) { +logger.debug("{} has merged {}/{} files", taskName, cnt, unmergedFiles.size()); + } +} +if (logger.isInfoEnabled()) { + logger.info("{} has merged all files after {}ms", taskName, System.currentTimeMillis() - startTime); +} +mergeLogger.logMergeEnd(); + } + + private void moveMergedToOld(TsFileResource seqFile) throws
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314574372 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java ## @@ -0,0 +1,424 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.task; + +import static org.apache.iotdb.db.utils.MergeUtils.writeBatchPoint; +import static org.apache.iotdb.db.utils.MergeUtils.writeTVPair; +import static org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.merge.manage.MergeContext; +import org.apache.iotdb.db.engine.merge.manage.MergeManager; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.recover.MergeLogger; +import org.apache.iotdb.db.engine.merge.selector.MergePathSelector; +import org.apache.iotdb.db.engine.merge.selector.NaivePathSelector; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.db.utils.MergeUtils.MetaListEntry; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class MergeMultiChunkTask { + + private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class); + private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig() + .getChunkMergePointThreshold(); + + private MergeLogger mergeLogger; + private List unmergedSeries; + + private String taskName; + private MergeResource resource; + private TimeValuePair[] currTimeValuePairs; + private boolean fullMerge; + + private MergeContext mergeContext; + + private AtomicInteger mergedChunkNum = new AtomicInteger(); + private AtomicInteger unmergedChunkNum = new AtomicInteger(); + private int mergedSeriesCnt; + private double progress; + + private int concurrentMergeSeriesNum; + private List currMergingPaths = new ArrayList<>(); + + MergeMultiChunkTask(MergeContext context, String taskName, MergeLogger mergeLogger, + MergeResource mergeResource, boolean fullMerge, List unmergedSeries, + int concurrentMergeSeriesNum) { +this.mergeContext = context; +this.taskName = taskName; +this.mergeLogger = mergeLogger; +this.resource = mergeResource; +this.fullMerge = fullMerge; +this.unmergedSeries = unmergedSeries; +this.concurrentMergeSeriesNum = concurrentMergeSeriesNum; + } + + void mergeSeries() throws IOException { +if (logger.isInfoEnabled()) { + logger.info("{} starts to merge {} series", taskName, unmergedSeries.size()); +} +long startTime = System.currentTimeMillis(); +for (TsFileResource seqFile : resource.getSeqFiles()) { + mergeContext.getUnmergedChunkStartTimes().put(seqFile, new HashMap<>()); +} +// merge each series and write data into each seqFile's corresponding temp merge file +List> devicePaths = MergeUtils.splitPathsByDevice(unmergedSeries); Revi
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314574266 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java ## @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.recover; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.read.common.Path; + +/** + * MergeLogger records the progress of a merge in file "merge.log" as text lines. + */ +public class MergeLogger { + + public static final String MERGE_LOG_NAME = "merge.log"; + + static final String STR_SEQ_FILES = "seqFiles"; + static final String STR_UNSEQ_FILES = "unseqFiles"; + static final String STR_TIMESERIES = "timeseries"; + static final String STR_START = "start"; + static final String STR_END = "end"; + static final String STR_ALL_TS_END = "all ts end"; + static final String STR_MERGE_START = "merge start"; + static final String STR_MERGE_END = "merge end"; + + private BufferedWriter logStream; + + public MergeLogger(String storageGroupDir) throws IOException { +logStream = new BufferedWriter(new FileWriter(new File(storageGroupDir, MERGE_LOG_NAME), true)); + } + + public void close() throws IOException { +logStream.close(); + } + + public void logTSStart(List paths) throws IOException { +logStream.write(STR_START); +for (Path path : paths) { + logStream.write(" " + path.getFullPath()); +} +logStream.newLine(); +logStream.flush(); + } + + public void logFilePositionUpdate(File file) throws IOException { +logStream.write(String.format("%s %d", file.getAbsolutePath(), file.length())); +logStream.newLine(); +logStream.flush(); + } + + public void logTSEnd() throws IOException { +logStream.write(STR_END); +logStream.newLine(); +logStream.flush(); + } + + public void logAllTsEnd() throws IOException { +logStream.write(STR_ALL_TS_END); +logStream.newLine(); +logStream.flush(); + } + + public void logFileMergeStart(File file, long position) throws IOException { +logStream.write(String.format("%s %d", file.getAbsolutePath(), position)); +logStream.newLine(); +logStream.flush(); + } + + public void logFileMergeEnd() throws IOException { +logStream.write(STR_END); +logStream.newLine(); +logStream.flush(); + } + + public void logMergeEnd() throws IOException { +logStream.write(STR_MERGE_END); +logStream.newLine(); +logStream.flush(); + } + + public void logFiles(MergeResource resource) throws IOException { +logSeqFiles(resource.getSeqFiles()); +logUnseqFiles(resource.getUnseqFiles()); + } + + public void logAllTS(List paths) throws IOException { Review comment: It is negligible compared to the merge task itself and it won't last for long. But now I get the paths from MManager, it is not necessary to record them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314573246 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.task; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.iotdb.db.engine.merge.manage.MergeContext; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.recover.MergeLogger; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.read.common.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeTask merges given seqFiles and unseqFiles into a new one, which basically consists of three + * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files + *2. move the merged chunks in the temp files back to the seqFiles or move the unmerged + *chunks in the seqFiles int temp files and replace the seqFiles with the temp files. + *3. remove unseqFiles + */ +public class MergeTask implements Callable { + + public static final String MERGE_SUFFIX = ".merge"; + private static final Logger logger = LoggerFactory.getLogger(MergeTask.class); + + MergeResource resource; + String storageGroupDir; + MergeLogger mergeLogger; + MergeContext mergeContext = new MergeContext(); + + private MergeCallback callback; + int concurrentMergeSeriesNum; + String taskName; + boolean fullMerge; + + MergeTask(List seqFiles, + List unseqFiles, String storageGroupDir, MergeCallback callback, + String taskName, boolean fullMerge) { +this.resource = new MergeResource(seqFiles, unseqFiles); +this.storageGroupDir = storageGroupDir; +this.callback = callback; +this.taskName = taskName; +this.fullMerge = fullMerge; +this.concurrentMergeSeriesNum = 1; + } + + public MergeTask(MergeResource mergeResource, String storageGroupDir, MergeCallback callback, + String taskName, boolean fullMerge, int concurrentMergeSeriesNum) { +this.resource = mergeResource; +this.storageGroupDir = storageGroupDir; +this.callback = callback; +this.taskName = taskName; +this.fullMerge = fullMerge; +this.concurrentMergeSeriesNum = concurrentMergeSeriesNum; + } + + @Override + public Void call() throws Exception { +try { + doMerge(); +} catch (Exception e) { + logger.error("Runtime exception in merge {}", taskName, e); + cleanUp(false); + // call the callback to make sure the StorageGroup exit merging status, but passing 2 + // empty file lists to avoid files being deleted. + callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME)); + throw e; +} +return null; + } + + private void doMerge() throws IOException { +if (logger.isInfoEnabled()) { + logger.info("{} starts to merge {} seqFiles, {} unseqFiles", taskName, + resource.getSeqFiles().size(), resource.getUnseqFiles().size()); +} +long startTime = System.currentTimeMillis(); +long totalFileSize = MergeUtils.collectFileSizes(resource.getSeqFiles(), +resource.getUnseqFiles()); +mergeLogger = new MergeLogger(storageGroupDir); + +mergeLogger.logFiles(resource); Review comment: See the StorageGroupProcessor.merge() for the answer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314573126 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String mea
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314572799 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); Review comment: Replied before, even if it can, it is negligible and not worth the coupling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314572858 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); Review comment: See response before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314572600 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.task; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.iotdb.db.engine.merge.manage.MergeContext; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.recover.MergeLogger; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.read.common.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeTask merges given seqFiles and unseqFiles into a new one, which basically consists of three + * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files + *2. move the merged chunks in the temp files back to the seqFiles or move the unmerged + *chunks in the seqFiles int temp files and replace the seqFiles with the temp files. + *3. remove unseqFiles + */ +public class MergeTask implements Callable { + + public static final String MERGE_SUFFIX = ".merge"; + private static final Logger logger = LoggerFactory.getLogger(MergeTask.class); + + MergeResource resource; + String storageGroupDir; + MergeLogger mergeLogger; + MergeContext mergeContext = new MergeContext(); + + private MergeCallback callback; + int concurrentMergeSeriesNum; + String taskName; + boolean fullMerge; + + MergeTask(List seqFiles, + List unseqFiles, String storageGroupDir, MergeCallback callback, + String taskName, boolean fullMerge) { +this.resource = new MergeResource(seqFiles, unseqFiles); +this.storageGroupDir = storageGroupDir; +this.callback = callback; +this.taskName = taskName; +this.fullMerge = fullMerge; +this.concurrentMergeSeriesNum = 1; + } + + public MergeTask(MergeResource mergeResource, String storageGroupDir, MergeCallback callback, + String taskName, boolean fullMerge, int concurrentMergeSeriesNum) { +this.resource = mergeResource; +this.storageGroupDir = storageGroupDir; +this.callback = callback; +this.taskName = taskName; +this.fullMerge = fullMerge; +this.concurrentMergeSeriesNum = concurrentMergeSeriesNum; + } + + @Override + public Void call() throws Exception { +try { + doMerge(); +} catch (Exception e) { + logger.error("Runtime exception in merge {}", taskName, e); + cleanUp(false); + // call the callback to make sure the StorageGroup exit merging status, but passing 2 + // empty file lists to avoid files being deleted. + callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME)); Review comment: When such situation emerges, the MergeLogger cannot be closed correctly and some fatal error must have happened. In such situation, further merges should not proceed thus not calling the callback may be better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314571495 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; Review comment: It is also used in selectors, so no. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314571438 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String mea
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314570343 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String mea
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314570087 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String mea
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314569883 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); + + private Map fileReaderCache; Review comment: This coupling hardly brings any merit. It may save some time reading TsFileMetadata, but it is relatively small compared to the MergeTask itself. And the chance you are merging a file that is being queried is not that big too, so I prefer to decouple it from other modules. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314568335 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); Review comment: Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314568143 ## File path: server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java ## @@ -39,7 +41,7 @@ /** * key: Tsfile path. value: TsFileMetaData */ - private LRULinkedHashMap cache; + private LRULinkedHashMap cache; Review comment: We do not have two TsFileResource for the same TsFile, so it is fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314568004 ## File path: server/iotdb/conf/iotdb-engine.properties ## @@ -155,7 +155,43 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merges, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false + +# A global merge will be performed each such interval, that is, each storage group will be merged +# (if proper merge candidates can be found). Unit: second, default: 1hours. +# When less than or equal to 0, timed merge is disabled. +merge_interval_sec=3600 + +# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how Review comment: No overflow, no merge. The most and certain merit of doing merge is to make the unsequential files sequential. Making the deleted data disappear physically is fine, but it would be considered as a future work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314567286 ## File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java ## @@ -229,6 +230,45 @@ */ private boolean chunkBufferPoolEnable = false; + /** + * How much memory (in byte) can be used by a single merge task. + */ + private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2); + + /** + * How many thread will be set up to perform merges. + */ + private int mergeThreadNum = 1; + + /** + * When set to true, if some crashed merges are detected during system rebooting, such merges will + * be continued, otherwise, the unfinished parts of such merges will not be continued while the + * finished parts still remain as they are. + */ + private boolean continueMergeAfterReboot = true; + + /** + * A global merge will be performed each such interval, that is, each storage group will be merged + * (if proper merge candidates can be found). Unit: second. + */ + private long mergeIntervalSec = 2 * 3600L; + + /** + * When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how + * much they are overflowed). This may increase merge overhead depending on how much the SeqFiles + * are overflowed. + */ + private boolean forceFullMerge = false; + + /** + * During a merge, if a chunk with less number of chunks than this parameter, the chunk will be + * merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach + * this threshold and the new chunk will be flushed. + */ + private int chunkMergePointThreshold = 512; + + private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.MAX_SERIES_NUM; Review comment: This parameter is experimental, not for user. Developers may change it by changing the source code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314566945 ## File path: server/iotdb/conf/iotdb-engine.properties ## @@ -155,7 +155,43 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merges, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false + +# A global merge will be performed each such interval, that is, each storage group will be merged +# (if proper merge candidates can be found). Unit: second, default: 1hours. +# When less than or equal to 0, timed merge is disabled. +merge_interval_sec=3600 + +# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how +# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles +# are overflowed. +force_full_merge=false + +# During a merge, if a chunk with less number of chunks than this parameter, the chunk will be Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on issue #258: [IOTDB-143]Development of merge
jt2594838 commented on issue #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#issuecomment-521864538 1. I have provided a config to perform quick recovery. Since there are modifications of existing files, the recovery cannot be delayed or the file status may not be correct. 2. We lack a mechanism of compatibility, which is definitely a BIG job to do. I think we should consider it later instead of messing up here. 3. The design document is merely an introduction of what my idea is, as long as it does not conflict the methodology behind it, I do not think I should explain each line of code in that. And if you follow the calling stack instead of read the source file form top to bottom, you will find most of the fields only function in limited places. There are not many branches in that file and it is almost a straight line, but if you do not follow it, getting lost may be easy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge
jt2594838 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314565013 ## File path: server/iotdb/conf/iotdb-engine.properties ## @@ -155,7 +155,43 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merges, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false Review comment: I think I have written " the unfinished parts of such merges will not be continued". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312745476 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String me
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312740345 ## File path: server/src/main/java/org/apache/iotdb/db/engine/cache/TsFileMetaDataCache.java ## @@ -39,7 +41,7 @@ /** * key: Tsfile path. value: TsFileMetaData */ - private LRULinkedHashMap cache; + private LRULinkedHashMap cache; Review comment: Is that OK to use TsFileResource (you need to make sure that the instance are the same in memory) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314331850 ## File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java ## @@ -185,53 +204,95 @@ public StorageGroupProcessor(String systemInfoDir, String storageGroupName) private void recover() throws ProcessorException { logger.info("recover Storage Group {}", storageGroupName); -// collect TsFiles from sequential data directory -List tsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders()); -recoverSeqFiles(tsFiles); - -// collect TsFiles from unsequential data directory -tsFiles = getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders()); -recoverUnseqFiles(tsFiles); +try { + // collect TsFiles from sequential and unsequential data directory + List seqTsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders()); + List unseqTsFiles = + getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders()); + + recoverSeqFiles(seqTsFiles); + recoverUnseqFiles(unseqTsFiles); + + String taskName = storageGroupName + "-" + System.currentTimeMillis(); + File mergingMods = new File(storageGroupSysDir, MERGING_MODIFICAITON_FILE_NAME); + if (mergingMods.exists()) { +mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME); + } + RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles, Review comment: if this job may block the startup, I think it cannot be accepted, as a merge process may be time consuming... Therefore, try to put it into the merge thread... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312738934 ## File path: server/iotdb/conf/iotdb-engine.properties ## @@ -155,7 +155,43 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merges, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false + +# A global merge will be performed each such interval, that is, each storage group will be merged +# (if proper merge candidates can be found). Unit: second, default: 1hours. +# When less than or equal to 0, timed merge is disabled. +merge_interval_sec=3600 + +# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how +# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles +# are overflowed. +force_full_merge=false + +# During a merge, if a chunk with less number of chunks than this parameter, the chunk will be Review comment: > if a chunk with less number of chunks than this parameter "if a chunk with less number of points than this parameter"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314345112 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java ## @@ -151,6 +159,8 @@ public static TsFileMetaData deserializeFrom(ByteBuffer buffer) throws IOExcepti if (ReadWriteIOUtils.readIsNull(buffer)) { fileMetaData.createdBy = ReadWriteIOUtils.readString(buffer); } +fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer); Review comment: to keep compatibility, you'd better deserialize like this: ``` if(tsfile.version ==0.8.0) { totalChunkNumber = ?; invalidChunkNum = 0; } else{ fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer); ... } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313206195 ## File path: server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java ## @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MergeUtils { + + private static final Logger logger = LoggerFactory.getLogger(MergeUtils.class); + + private MergeUtils() { +// util class + } + + public static void writeTVPair(TimeValuePair timeValuePair, IChunkWriter chunkWriter) { +switch (chunkWriter.getDataType()) { + case TEXT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary()); +break; + case DOUBLE: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); +break; + case BOOLEAN: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean()); +break; + case INT64: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getLong()); +break; + case INT32: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getInt()); +break; + case FLOAT: +chunkWriter.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getFloat()); +break; + default: +throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); +} + } + + /** + * Collect all paths contained in the all SeqFiles and UnseqFiles in a merge and sort them + * before return. + * @param resource + * @return all paths contained in the merge. + * @throws IOException + */ + public static List collectPaths(MergeResource resource) + throws IOException { +Set pathSet = new HashSet<>(); +for (TsFileResource tsFileResource : resource.getUnseqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +for (TsFileResource tsFileResource : resource.getSeqFiles()) { + TsFileSequenceReader sequenceReader = resource.getFileReader(tsFileResource); + resource.getMeasurementSchemaMap().putAll(sequenceReader.readFileMetadata().getMeasurementSchema()); + pathSet.addAll(collectFileSeries(sequenceReader)); +} +List ret = new ArrayList<>(pathSet); +ret.sort(Comparator.comparing(Path::getFullPath)); +return ret; + } + + private static List collectFileSeries(TsFileSequenceReader sequenceReader) throws IOException { +TsFileMetaData metaData = sequenceReader.readFileMetadata(); +Set deviceIds = metaData.getDeviceMap().keySet(); +Set measurements = metaData.getMeasurementSchema().keySet(); +List paths = new ArrayList<>(); +for (String deviceId : deviceIds) { + for (String me
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312738889 ## File path: server/iotdb/conf/iotdb-engine.properties ## @@ -155,7 +155,43 @@ concurrent_flush_thread=0 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory # (i.e., whether use ChunkBufferPool), value true, false -chunk_buffer_pool_enable = false +chunk_buffer_pool_enable=false + + +### Merge Configurations + + +# How many thread will be set up to perform merges, 1 by default. +# Set to 1 when less than or equal to 0. +merge_thread_num=1 + +# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default. +# This is only a rough estimation, starting from a relatively small value to avoid OOM. +# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the +# total memory estimation of merge. +# merge_memory_budget=2147483648 + +# When set to true, if some crashed merges are detected during system rebooting, such merges will +# be continued, otherwise, the unfinished parts of such merges will not be continued while the +# finished parts still remains as they are. +# If you are feeling the rebooting is too slow, set this to false, false by default +continue_merge_after_reboot=false Review comment: hard to understand: If it is false, will the unfinished task be continued after startup? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314345452 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetaData.java ## @@ -108,6 +114,8 @@ public static TsFileMetaData deserializeFrom(InputStream inputStream) throws IOE if (ReadWriteIOUtils.readIsNull(inputStream)) { fileMetaData.createdBy = ReadWriteIOUtils.readString(inputStream); } +fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(inputStream); Review comment: keep compatibility This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314338342 ## File path: server/src/main/java/org/apache/iotdb/db/service/ServiceType.java ## @@ -31,6 +31,7 @@ AUTHORIZATION_SERVICE("Authorization ServerService", ""), FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""), SYNC_SERVICE("SYNC ServerService", ""), + MERGE_SERVICE("Merge Manager", ""), Review comment: it could be better if you add a jmx module about this task. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314340941 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java ## @@ -135,9 +135,9 @@ */ public static int pageCheckSizeThreshold = 100; /** - * Default endian value is LITTLE_ENDIAN. + * Default endian value is BIG_ENDIAN. Review comment: And, does the file compatible with 0.8.0? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314340102 ## File path: tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java ## @@ -135,9 +135,9 @@ */ public static int pageCheckSizeThreshold = 100; /** - * Default endian value is LITTLE_ENDIAN. + * Default endian value is BIG_ENDIAN. Review comment: it is ok, but why? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r312744740 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java ## @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.manage; + +import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeResource manages files and caches of readers, writers, MeasurementSchemas and + * modifications to avoid unnecessary object creations and file openings. + */ +public class MergeResource { + + private static final Logger logger = LoggerFactory.getLogger(MergeResource.class); + + private List seqFiles; + private List unseqFiles; + + private QueryContext mergeContext = new QueryContext(); Review comment: remove this? or rename it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r313197900 ## File path: server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge.task; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.iotdb.db.engine.merge.manage.MergeContext; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.recover.MergeLogger; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.utils.MergeUtils; +import org.apache.iotdb.tsfile.read.common.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MergeTask merges given seqFiles and unseqFiles into a new one, which basically consists of three + * steps: 1. rewrite overflowed, modified or small-sized chunks into temp merge files + *2. move the merged chunks in the temp files back to the seqFiles or move the unmerged + *chunks in the seqFiles int temp files and replace the seqFiles with the temp files. + *3. remove unseqFiles + */ +public class MergeTask implements Callable { + + public static final String MERGE_SUFFIX = ".merge"; + private static final Logger logger = LoggerFactory.getLogger(MergeTask.class); + + MergeResource resource; + String storageGroupDir; + MergeLogger mergeLogger; + MergeContext mergeContext = new MergeContext(); + + private MergeCallback callback; + int concurrentMergeSeriesNum; + String taskName; + boolean fullMerge; + + MergeTask(List seqFiles, + List unseqFiles, String storageGroupDir, MergeCallback callback, + String taskName, boolean fullMerge) { +this.resource = new MergeResource(seqFiles, unseqFiles); +this.storageGroupDir = storageGroupDir; +this.callback = callback; +this.taskName = taskName; +this.fullMerge = fullMerge; +this.concurrentMergeSeriesNum = 1; + } + + public MergeTask(MergeResource mergeResource, String storageGroupDir, MergeCallback callback, + String taskName, boolean fullMerge, int concurrentMergeSeriesNum) { +this.resource = mergeResource; +this.storageGroupDir = storageGroupDir; +this.callback = callback; +this.taskName = taskName; +this.fullMerge = fullMerge; +this.concurrentMergeSeriesNum = concurrentMergeSeriesNum; + } + + @Override + public Void call() throws Exception { +try { + doMerge(); +} catch (Exception e) { + logger.error("Runtime exception in merge {}", taskName, e); + cleanUp(false); + // call the callback to make sure the StorageGroup exit merging status, but passing 2 + // empty file lists to avoid files being deleted. + callback.call(Collections.emptyList(), Collections.emptyList(), new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME)); Review comment: because `cleanUp(false)` may throw IOException, `callback.call()` may be skipped... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge
jixuan1989 commented on a change in pull request #258: [IOTDB-143]Development of merge URL: https://github.com/apache/incubator-iotdb/pull/258#discussion_r314337283 ## File path: server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java ## @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.chunkRelated; + +import java.io.IOException; +import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TimeValuePairUtils; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; + +public class CachedDiskChunkReader implements IPointReader { Review comment: I see, how about implementing this class by inherit DiskChunkReader? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services