jt2594838 commented on code in PR #498: URL: https://github.com/apache/tsfile/pull/498#discussion_r2101487112
########## java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java: ########## @@ -115,26 +139,90 @@ public Pair<IDeviceID, List<Pair<String, TimeValuePair>>> next() { } private List<Pair<String, TimeValuePair>> convertToLastPoints( - List<TimeseriesMetadata> timeseriesMetadataList) { - return timeseriesMetadataList.stream() - .map( - seriesMeta -> - new Pair<>( - seriesMeta.getMeasurementId(), - new TimeValuePair( - seriesMeta.getStatistics().getEndTime(), - TsPrimitiveType.getByType( - seriesMeta.getTsDataType() == TSDataType.VECTOR - ? TSDataType.INT64 - : seriesMeta.getTsDataType(), - seriesMeta.getTsDataType() == TSDataType.VECTOR - ? seriesMeta.getStatistics().getEndTime() - : seriesMeta.getStatistics().getLastValue())))) - .collect(Collectors.toList()); + List<TimeseriesMetadata> timeseriesMetadataList) throws IOException { + boolean isAligned = timeseriesMetadataList.get(0).getTsDataType() == TSDataType.VECTOR; + List<Pair<String, TimeValuePair>> list = new ArrayList<>(); + for (TimeseriesMetadata meta : timeseriesMetadataList) { + Pair<String, TimeValuePair> stringTimeValuePairPair = convertToLastPoint(meta, isAligned); + list.add(stringTimeValuePairPair); + } + return list; + } + + private TimeValuePair readNonAlignedLastPoint(Chunk chunk) throws IOException { + ChunkReader chunkReader = new ChunkReader(chunk); + BatchData batchData = null; + while (chunkReader.hasNextSatisfiedPage()) { + batchData = chunkReader.nextPageData(); + } + if (batchData != null) { + return batchData.getLastPairBeforeOrEqualTimestamp(Long.MAX_VALUE); + } else { + return null; + } + } + + private TimeValuePair readAlignedLastPoint(Chunk chunk, ChunkMetadata chunkMetadata, long endTime) throws IOException { + ByteBuffer chunkData = chunk.getData(); + PageHeader lastPageHeader = null; + ByteBuffer lastPageData = null; + while (chunkData.hasRemaining()) { + if (chunk.isSinglePageChunk()) { + lastPageHeader = PageHeader.deserializeFrom(chunkData, chunkMetadata.getStatistics()); + } else { + lastPageHeader = PageHeader.deserializeFrom(chunkData, TSDataType.BLOB); + } + lastPageData = chunkData.slice(); + chunkData.position(chunkData.position() + lastPageHeader.getCompressedSize()); + } + if (lastPageHeader != null) { + CompressionType compressionType = chunk.getHeader().getCompressionType(); + if (compressionType != CompressionType.UNCOMPRESSED) { + ByteBuffer uncompressedPage = ByteBuffer.allocate(lastPageHeader.getUncompressedSize()); + IUnCompressor.getUnCompressor(compressionType).uncompress(lastPageData, uncompressedPage); + lastPageData = uncompressedPage; + lastPageData.flip(); + } + + ValuePageReader valuePageReader = new ValuePageReader(lastPageHeader, lastPageData, TSDataType.BLOB, + Decoder.getDecoderByType(chunk.getHeader().getEncodingType(), TSDataType.BLOB)); + TsPrimitiveType lastValue = null; + for (int i = 0; i < valuePageReader.getSize(); i++) { + // the timestamp here is not necessary + lastValue = valuePageReader.nextValue(0, i); + } + return new TimeValuePair(endTime, lastValue); + } else { + return null; + } + } + + private Pair<String, TimeValuePair> convertToLastPoint( + TimeseriesMetadata seriesMeta, boolean isAligned) throws IOException { + if (seriesMeta.getTsDataType() != TSDataType.BLOB) { + return new Pair<>( + seriesMeta.getMeasurementId(), + new TimeValuePair( + seriesMeta.getStatistics().getEndTime(), + seriesMeta.getTsDataType() == TSDataType.VECTOR ? + TsPrimitiveType.getByType(TSDataType.INT64, seriesMeta.getStatistics().getEndTime()) : + TsPrimitiveType.getByType(seriesMeta.getTsDataType(), + seriesMeta.getStatistics().getLastValue()))); + } else { + ChunkMetadata chunkMetadata = (ChunkMetadata) seriesMeta.getChunkMetadataList() Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@tsfile.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org