JackieTien97 commented on a change in pull request #652: [386] Vectorize the raw data query process URL: https://github.com/apache/incubator-iotdb/pull/652#discussion_r361115796
########## File path: server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader.resourceRelated; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap; +import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader; +import org.apache.iotdb.db.utils.QueryUtils; +import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.IBatchReader; + +/** + * To read a list of unsequence TsFiles, this class implements <code>IBatchReader</code> for the + * TsFiles. Note that an unsequence TsFile can be either closed or unclosed. An unclosed unsequence + * TsFile consists of data on disk and data in memtables that will be flushed to this unclosed + * TsFile. This class is used in {@link org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter}. + */ +public class NewUnseqResourceMergeReader implements IBatchReader { + + private PriorityMergeReader priorityMergeReader = new PriorityMergeReader(); + private List<ChunkMetaData> chunkMetaDataList = new ArrayList<>(); + private Filter timeFilter; + private int index = 0; // used to index current metadata in metaDataList + + private static final int DEFAULT_BATCH_DATA_SIZE = 10000; + + private BatchData batchData; + private TSDataType dataType; + + /** + * prepare metaDataList + */ + public NewUnseqResourceMergeReader(Path seriesPath, TSDataType dataType, + List<TsFileResource> unseqResources, QueryContext context, Filter filter) throws IOException { + + this.dataType = dataType; + this.timeFilter = filter; + int priority = 1; + + // get all ChunkMetadata + for (TsFileResource tsFileResource : unseqResources) { + + // if unseq tsfile is closed or has flushed chunk groups, then endtime map is not empty + if (!tsFileResource.getEndTimeMap().isEmpty()) { + if (!ResourceRelatedUtil.isTsFileSatisfied(tsFileResource, timeFilter, seriesPath)) { + continue; + } + } + + /* + * handle disk chunks of closed or unclosed file + */ + List<ChunkMetaData> currentChunkMetaDataList; + if (tsFileResource.isClosed()) { + // get chunk metadata list of current closed tsfile + currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath); + List<Modification> pathModifications = context + .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath()); + if (!pathModifications.isEmpty()) { + QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications); + } + } else { + // metadata list of already flushed chunk groups + currentChunkMetaDataList = tsFileResource.getChunkMetaDataList(); + } + + if (!currentChunkMetaDataList.isEmpty()) { + TsFileSequenceReader tsFileReader = FileReaderManager.getInstance() + .get(tsFileResource, tsFileResource.isClosed()); + ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileReader); + + for (ChunkMetaData chunkMetaData : currentChunkMetaDataList) { + if (timeFilter == null || timeFilter.satisfy(chunkMetaData.getStatistics())) { + chunkMetaData.setPriority(priority++); + chunkMetaData.setChunkLoader(chunkLoader); + chunkMetaDataList.add(chunkMetaData); + } + } + } + + /* + * handle mem chunks of unclosed file + */ + if (!tsFileResource.isClosed()) { + ChunkReaderWrap memChunkReaderWrap = new ChunkReaderWrap( + tsFileResource.getReadOnlyMemChunk(), timeFilter); + priorityMergeReader.addReaderWithPriority(memChunkReaderWrap.getIPointReader(), priority++); + } + } + + // sort All ChunkMetadata by start time + chunkMetaDataList = chunkMetaDataList.stream() + .sorted(Comparator.comparing(ChunkMetaData::getStartTime)).collect(Collectors.toList()); Review comment: There is no need to use stream().sorted, you can directly use chunkMetaDataList.sort(Comparator<? extends T>) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
