JackieTien97 commented on code in PR #12539: URL: https://github.com/apache/iotdb/pull/12539#discussion_r1609132472
########## iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties: ########## @@ -20,8 +20,8 @@ timestamp_precision=ms data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus -schema_replication_factor=3 -data_replication_factor=3 Review Comment: change them back ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; + +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AlignedDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.ChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.DeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator; +import org.apache.iotdb.db.utils.ModificationUtils; + +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +public class UnclosedFileScanHandleImpl implements IFileScanHandle { + + private final TsFileResource tsFileResource; + private final Map<IDeviceID, Map<String, List<IChunkMetadata>>> deviceToChunkMetadataMap; + private final Map<IDeviceID, Map<String, List<IChunkHandle>>> deviceToMemChunkHandleMap; + + public UnclosedFileScanHandleImpl( + Map<IDeviceID, Map<String, List<IChunkMetadata>>> deviceToChunkMetadataMap, + Map<IDeviceID, Map<String, List<IChunkHandle>>> deviceToMemChunkHandleMap, + TsFileResource tsFileResource) { + this.deviceToChunkMetadataMap = deviceToChunkMetadataMap; + this.deviceToMemChunkHandleMap = deviceToMemChunkHandleMap; + this.tsFileResource = tsFileResource; + } + + @Override + public TsFileDeviceStartEndTimeIterator getDeviceStartEndTimeIterator() throws IOException { + ITimeIndex timeIndex = tsFileResource.getTimeIndex(); + return timeIndex instanceof DeviceTimeIndex + ? new TsFileDeviceStartEndTimeIterator((DeviceTimeIndex) timeIndex) + : new TsFileDeviceStartEndTimeIterator(tsFileResource.buildDeviceTimeIndex()); + } + + @Override + public boolean[] isDeviceTimeDeleted(IDeviceID deviceID, long[] timeArray) { + Map<String, List<IChunkMetadata>> chunkMetadataMap = deviceToChunkMetadataMap.get(deviceID); + boolean[] result = new boolean[timeArray.length]; + + chunkMetadataMap.values().stream() + .flatMap(List::stream) + .map(IChunkMetadata::getDeleteIntervalList) + .filter(deleteIntervalList -> !deleteIntervalList.isEmpty()) + .forEach( + timeRangeList -> { + Integer deleteCursor = 0; + IntStream.range(0, timeArray.length) + .forEach( + i -> { + if (!result[i] + && ModificationUtils.isPointDeleted( + timeArray[i], timeRangeList, deleteCursor)) { + result[i] = true; + } + }); + }); + return result; + } + + @Override + public Iterator<AbstractDeviceChunkMetaData> getAllDeviceChunkMetaData() throws IOException { + List<AbstractDeviceChunkMetaData> deviceChunkMetaDataList = new ArrayList<>(); + for (Map.Entry<IDeviceID, Map<String, List<IChunkMetadata>>> entry : + deviceToChunkMetadataMap.entrySet()) { + IDeviceID deviceID = entry.getKey(); + Map<String, List<IChunkMetadata>> chunkMetadataList = entry.getValue(); + if (chunkMetadataList.isEmpty()) { + continue; + } + + boolean isAligned = chunkMetadataList.containsKey(""); + if (isAligned) { + List<AlignedChunkMetadata> alignedChunkMetadataList = new ArrayList<>(); + List<IChunkMetadata> timeChunkMetadataList = chunkMetadataList.get(""); + List<List<IChunkMetadata>> valueChunkMetadataList = + new ArrayList<>(chunkMetadataList.values()); + for (int i = 0; i < timeChunkMetadataList.size(); i++) { + alignedChunkMetadataList.add( + new AlignedChunkMetadata( + timeChunkMetadataList.get(i), valueChunkMetadataList.get(i))); + } + deviceChunkMetaDataList.add( + new AlignedDeviceChunkMetaData(deviceID, alignedChunkMetadataList)); + } else { + for (Map.Entry<String, List<IChunkMetadata>> measurementMetaData : + chunkMetadataList.entrySet()) { + deviceChunkMetaDataList.add( + new DeviceChunkMetaData(deviceID, measurementMetaData.getValue())); + } + } + } + return deviceChunkMetaDataList.iterator(); + } + + @Override + public boolean[] isTimeSeriesTimeDeleted( + IDeviceID deviceID, String timeSeriesName, long[] timeArray) { + List<IChunkMetadata> chunkMetadataList = + deviceToChunkMetadataMap.get(deviceID).get(timeSeriesName); + boolean[] result = new boolean[timeArray.length]; + chunkMetadataList.stream() + .map(IChunkMetadata::getDeleteIntervalList) + .filter(deleteIntervalList -> !deleteIntervalList.isEmpty()) + .forEach( + timeRangeList -> { + Integer deleteCursor = 0; + IntStream.range(0, timeArray.length) Review Comment: same as above ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java: ########## @@ -1925,17 +1901,186 @@ private List<TsFileResource> getFileResourceListForQuery( closeQueryLock.readLock().lock(); try { if (tsFileResource.isClosed()) { - tsfileResourcesForQuery.add(tsFileResource); + tsFileResourcesForQuery.add(tsFileResource); } else { - tsFileResource.getProcessor().query(pathList, context, tsfileResourcesForQuery); + tsFileResource.getProcessor().query(pathList, context, tsFileResourcesForQuery); } } catch (IOException e) { throw new MetadataException(e); } finally { closeQueryLock.readLock().unlock(); } } - return tsfileResourcesForQuery; + return tsFileResourcesForQuery; + } + + @Override + public IQueryDataSource queryForSeriesRegionScan( + List<PartialPath> pathList, + QueryContext queryContext, + Filter globalTimeFilter, + List<Long> timePartitions) + throws QueryProcessException { + try { + List<IFileScanHandle> seqFileScanHandles = + getFileHandleListForQuery( + tsFileManager.getTsFileList(true, timePartitions, globalTimeFilter), + pathList, + queryContext, + globalTimeFilter, + true); + List<IFileScanHandle> unseqFileScanHandles = + getFileHandleListForQuery( + tsFileManager.getTsFileList(false, timePartitions, globalTimeFilter), + pathList, + queryContext, + globalTimeFilter, + false); + + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, seqFileScanHandles.size()); + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( + UNSEQUENCE_TSFILE, unseqFileScanHandles.size()); + + QueryDataSourceForRegionScan dataSource = + new QueryDataSourceForRegionScan(seqFileScanHandles, unseqFileScanHandles); + dataSource.setDataTTL(dataTTL); + return dataSource; + } catch (MetadataException e) { + throw new QueryProcessException(e); + } + } + + private List<IFileScanHandle> getFileHandleListForQuery( + Collection<TsFileResource> tsFileResources, + List<PartialPath> partialPaths, + QueryContext context, + Filter globalTimeFilter, + boolean isSeq) + throws MetadataException { + + List<IFileScanHandle> fileScanHandles = new ArrayList<>(); + + long timeLowerBound = + dataTTL != Long.MAX_VALUE ? CommonDateTimeUtils.currentTime() - dataTTL : Long.MIN_VALUE; + context.setQueryTimeLowerBound(timeLowerBound); + + for (TsFileResource tsFileResource : tsFileResources) { + if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, dataTTL, context.isDebug())) { + continue; + } + closeQueryLock.readLock().lock(); + try { + if (tsFileResource.isClosed()) { + fileScanHandles.add(new ClosedFileScanHandleImpl(tsFileResource, context)); + } else { + tsFileResource + .getProcessor() + .queryForSeriesRegionScan(partialPaths, context, fileScanHandles); + } + } finally { + closeQueryLock.readLock().unlock(); + } + } + return fileScanHandles; + } + + @Override + public IQueryDataSource queryForDeviceRegionScan( + Map<IDeviceID, Boolean> devicePathToAligned, + QueryContext queryContext, + Filter globalTimeFilter, + List<Long> timePartitions) + throws QueryProcessException { + try { + List<IFileScanHandle> seqFileScanHandles = + getFileHandleListForQuery( + tsFileManager.getTsFileList(true, timePartitions, globalTimeFilter), + devicePathToAligned, + queryContext, + globalTimeFilter, + true); + List<IFileScanHandle> unseqFileScanHandles = + getFileHandleListForQuery( + tsFileManager.getTsFileList(false, timePartitions, globalTimeFilter), + devicePathToAligned, + queryContext, + globalTimeFilter, + false); + + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, seqFileScanHandles.size()); + QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum( + UNSEQUENCE_TSFILE, unseqFileScanHandles.size()); + + QueryDataSourceForRegionScan dataSource = + new QueryDataSourceForRegionScan(seqFileScanHandles, unseqFileScanHandles); + dataSource.setDataTTL(dataTTL); + return dataSource; + } catch (MetadataException e) { + throw new QueryProcessException(e); + } + } + + private List<IFileScanHandle> getFileHandleListForQuery( + Collection<TsFileResource> tsFileResources, + Map<IDeviceID, Boolean> devicePathToAligned, + QueryContext context, + Filter globalTimeFilter, + boolean isSeq) + throws MetadataException { + + List<IFileScanHandle> fileScanHandles = new ArrayList<>(); + + long timeLowerBound = + dataTTL != Long.MAX_VALUE ? CommonDateTimeUtils.currentTime() - dataTTL : Long.MIN_VALUE; + context.setQueryTimeLowerBound(timeLowerBound); + + for (TsFileResource tsFileResource : tsFileResources) { + if (!tsFileResource.isSatisfied(null, globalTimeFilter, isSeq, dataTTL, context.isDebug())) { + continue; + } + closeQueryLock.readLock().lock(); + try { + if (tsFileResource.isClosed()) { + // Get all the modification in current device Review Comment: what's the meaning of this comment? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java: ########## @@ -140,10 +146,95 @@ public static void modifyAlignedChunkMetaData( }); } + public static boolean isPointDeleted(long timestamp, List<TimeRange> deletionList) { + Integer deleteCursor = 0; + return isPointDeleted(timestamp, deletionList, deleteCursor); + } + + public static boolean isPointDeleted( + long timestamp, List<TimeRange> deletionList, Integer deleteCursor) { Review Comment: ```suggestion long timestamp, List<TimeRange> deletionList, int deleteCursor) { ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java: ########## @@ -140,10 +146,95 @@ public static void modifyAlignedChunkMetaData( }); } + public static boolean isPointDeleted(long timestamp, List<TimeRange> deletionList) { + Integer deleteCursor = 0; Review Comment: ```suggestion int deleteCursor = 0; ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/SharedTimeDataBuffer.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils; + +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.reader.chunk.ChunkReader; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class SharedTimeDataBuffer { + private ByteBuffer timeBuffer; + private final IChunkMetadata timeChunkMetaData; + private ChunkHeader timeChunkHeader; + private final List<long[]> timeData; + private final Decoder defaultTimeDecoder = + Decoder.getDecoderByType( + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + + public SharedTimeDataBuffer(IChunkMetadata timeChunkMetaData) { + this.timeChunkMetaData = timeChunkMetaData; + this.timeData = new ArrayList<>(); + } + + // It should be called first before other methods in sharedTimeBuffer. + public void init(TsFileSequenceReader reader) throws IOException { + if (timeBuffer != null) { + return; + } + Chunk timeChunk = reader.readMemChunk(timeChunkMetaData.getOffsetOfChunkHeader()); + timeChunkHeader = timeChunk.getHeader(); + timeBuffer = timeChunk.getData(); + } + + public synchronized long[] getPageTime(int pageId) throws IOException { Review Comment: why need to be `synchronized`? and when did you know to stop reading? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java: ########## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; +import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; +import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AlignedChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AlignedDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.ChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.DeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator; +import org.apache.iotdb.db.utils.ModificationUtils; + +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.TsFileDeviceIterator; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.Pair; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class ClosedFileScanHandleImpl implements IFileScanHandle { + + private final TsFileResource tsFileResource; + private final QueryContext queryContext; + // Used to cache the modifications of each timeseries + private final Map<IDeviceID, Map<String, List<TimeRange>>> deviceToModifications; + + public ClosedFileScanHandleImpl(TsFileResource tsFileResource, QueryContext context) { + this.tsFileResource = tsFileResource; + this.queryContext = context; + this.deviceToModifications = new HashMap<>(); + } + + @Override + public TsFileDeviceStartEndTimeIterator getDeviceStartEndTimeIterator() throws IOException { + ITimeIndex timeIndex = tsFileResource.getTimeIndex(); + return timeIndex instanceof DeviceTimeIndex + ? new TsFileDeviceStartEndTimeIterator((DeviceTimeIndex) timeIndex) + : new TsFileDeviceStartEndTimeIterator(tsFileResource.buildDeviceTimeIndex()); + } + + @Override + public boolean[] isDeviceTimeDeleted(IDeviceID deviceID, long[] timeArray) + throws IllegalPathException { + boolean[] result = new boolean[2]; + List<Modification> modifications = queryContext.getPathModifications(tsFileResource, deviceID); + List<TimeRange> timeRangeList = + modifications.stream() + .filter(Deletion.class::isInstance) + .map(Deletion.class::cast) + .map(Deletion::getTimeRange) + .collect(Collectors.toList()); + + Integer deleteCursor = 0; + for (int i = 0; i < timeArray.length; i++) { + result[i] = ModificationUtils.isPointDeleted(timeArray[i], timeRangeList, deleteCursor); + } + return result; + } + + private boolean[] calculateBooleanArray(List<TimeRange> timeRangeList, long[] timeArray) { + boolean[] result = new boolean[timeArray.length]; + Integer deleteCursor = 0; + for (int i = 0; i < timeArray.length; i++) { + result[i] = ModificationUtils.isPointDeleted(timeArray[i], timeRangeList, deleteCursor); + } + return result; + } + + @Override + public boolean[] isTimeSeriesTimeDeleted( + IDeviceID deviceID, String timeSeriesName, long[] timeArray) throws IllegalPathException { + + if (deviceToModifications.containsKey(deviceID) + && deviceToModifications.get(deviceID).containsKey(timeSeriesName)) { + return calculateBooleanArray( + deviceToModifications.get(deviceID).get(timeSeriesName), timeArray); + } + + List<Modification> modifications = + queryContext.getPathModifications(tsFileResource, deviceID, timeSeriesName); + List<TimeRange> timeRangeList = + modifications.stream() + .filter(Deletion.class::isInstance) + .map(Deletion.class::cast) + .map(Deletion::getTimeRange) + .collect(Collectors.toList()); + deviceToModifications + .computeIfAbsent(deviceID, k -> new HashMap<>()) + .put(timeSeriesName, timeRangeList); + return calculateBooleanArray(timeRangeList, timeArray); + } + + @Override + public Iterator<AbstractDeviceChunkMetaData> getAllDeviceChunkMetaData() throws IOException { + + TsFileSequenceReader tsFileReader = FileReaderManager.getInstance().get(getFilePath(), true); + TsFileDeviceIterator deviceIterator = tsFileReader.getAllDevicesIteratorWithIsAligned(); + + List<AbstractDeviceChunkMetaData> deviceChunkMetaDataList = new LinkedList<>(); + // Traverse each device in current tsFile and get all the relating chunkMetaData + while (deviceIterator.hasNext()) { + Pair<IDeviceID, Boolean> deviceIDWithIsAligned = deviceIterator.next(); + Map<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> metadataForDevice = + tsFileReader.getTimeseriesMetadataOffsetByDevice( + deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), + Collections.emptySet(), + true); + if (!deviceIDWithIsAligned.right) { + // device is not aligned + deviceChunkMetaDataList.add( + new DeviceChunkMetaData( + deviceIDWithIsAligned.left, + metadataForDevice.values().stream() + .flatMap(pair -> pair.getLeft().stream()) + .collect(Collectors.toList()))); + } else { + // device is aligned + List<IChunkMetadata> timeChunkMetaData = metadataForDevice.get("").getLeft(); + List<List<IChunkMetadata>> valueMetaDataList = new ArrayList<>(); + for (Map.Entry<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> pair : + metadataForDevice.entrySet()) { + // Skip timeChunkMetaData + if (pair.getKey().isEmpty()) { + continue; + } + valueMetaDataList.add(pair.getValue().getLeft()); + } + + List<AlignedChunkMetadata> alignedDeviceChunkMetaData = new ArrayList<>(); + for (int i = 0; i < timeChunkMetaData.size(); i++) { + alignedDeviceChunkMetaData.add( + new AlignedChunkMetadata(timeChunkMetaData.get(i), valueMetaDataList.get(i))); + } + deviceChunkMetaDataList.add( + new AlignedDeviceChunkMetaData(deviceIDWithIsAligned.left, alignedDeviceChunkMetaData)); + } + } + return deviceChunkMetaDataList.iterator(); + } + + @Override + public Iterator<IChunkHandle> getChunkHandles( + List<AbstractChunkOffset> chunkInfoList, + List<Statistics<? extends Serializable>> statisticsList) { + String filePath = tsFileResource.getTsFilePath(); + List<IChunkHandle> chunkHandleList = new ArrayList<>(); + for (int i = 0; i < chunkInfoList.size(); i++) { + AbstractChunkOffset chunkOffset = chunkInfoList.get(i); + chunkHandleList.add( + chunkOffset instanceof ChunkOffset + ? new DiskChunkHandleImpl( + filePath, true, chunkOffset.getOffSet(), statisticsList.get(i)) + : new DiskAlignedChunkHandleImpl( + filePath, + true, + chunkOffset.getOffSet(), + statisticsList.get(i), + ((AlignedChunkOffset) chunkOffset).getSharedTimeDataBuffer())); Review Comment: why don't you put this method into `AbstractChunkOffset`? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/UnclosedFileScanHandleImpl.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; + +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IFileScanHandle; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AlignedDeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.ChunkOffset; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.DeviceChunkMetaData; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; +import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator; +import org.apache.iotdb.db.utils.ModificationUtils; + +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +public class UnclosedFileScanHandleImpl implements IFileScanHandle { + + private final TsFileResource tsFileResource; + private final Map<IDeviceID, Map<String, List<IChunkMetadata>>> deviceToChunkMetadataMap; + private final Map<IDeviceID, Map<String, List<IChunkHandle>>> deviceToMemChunkHandleMap; + + public UnclosedFileScanHandleImpl( + Map<IDeviceID, Map<String, List<IChunkMetadata>>> deviceToChunkMetadataMap, + Map<IDeviceID, Map<String, List<IChunkHandle>>> deviceToMemChunkHandleMap, + TsFileResource tsFileResource) { + this.deviceToChunkMetadataMap = deviceToChunkMetadataMap; + this.deviceToMemChunkHandleMap = deviceToMemChunkHandleMap; + this.tsFileResource = tsFileResource; + } + + @Override + public TsFileDeviceStartEndTimeIterator getDeviceStartEndTimeIterator() throws IOException { + ITimeIndex timeIndex = tsFileResource.getTimeIndex(); + return timeIndex instanceof DeviceTimeIndex + ? new TsFileDeviceStartEndTimeIterator((DeviceTimeIndex) timeIndex) + : new TsFileDeviceStartEndTimeIterator(tsFileResource.buildDeviceTimeIndex()); + } + + @Override + public boolean[] isDeviceTimeDeleted(IDeviceID deviceID, long[] timeArray) { + Map<String, List<IChunkMetadata>> chunkMetadataMap = deviceToChunkMetadataMap.get(deviceID); + boolean[] result = new boolean[timeArray.length]; + + chunkMetadataMap.values().stream() + .flatMap(List::stream) + .map(IChunkMetadata::getDeleteIntervalList) + .filter(deleteIntervalList -> !deleteIntervalList.isEmpty()) + .forEach( + timeRangeList -> { + Integer deleteCursor = 0; Review Comment: this won't be changed in `ModificationUtils.isPointDeleted` and why you use `IntStream.range(0,timeArray.length).forEach` this way to iterate timeArray? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
