jt2594838 commented on code in PR #415: URL: https://github.com/apache/tsfile/pull/415#discussion_r1970846296
########## java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java: ########## @@ -200,4 +214,164 @@ public Statistics getChunkStatistic() { public long getRetainedSizeInBytes() { return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity()); } + + public Chunk rewrite(TSDataType newType, boolean isValueChunk, Chunk timeChunk) + throws IOException { + if (newType == null || newType == chunkHeader.getDataType()) { + return this; + } + IMeasurementSchema schema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), newType, chunkHeader.getEncodingType()); + if (isValueChunk) { + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + chunkHeader.getMeasurementID(), + chunkHeader.getCompressionType(), + newType, + chunkHeader.getEncodingType(), + schema.getValueEncoder(), + encryptParam); + List<Chunk> valueChunks = new ArrayList<>(); + valueChunks.add(this); + AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> pages = chunkReader.loadPageReaderList(); + for (IPageReader page : pages) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + Object convertedValue = null; + if (point.getValue().getVector()[0] != null) { + convertedValue = + newType.castFromSingleValue( + chunkHeader.getDataType(), point.getValue().getVector()[0].getValue()); + } + long timestamp = point.getTimestamp(); + switch (newType) { + case BOOLEAN: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (boolean) convertedValue, + convertedValue == null); + break; + case DATE: + case INT32: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (int) convertedValue, + convertedValue == null); + break; + case TIMESTAMP: + case INT64: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (long) convertedValue, + convertedValue == null); + break; + case FLOAT: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (float) convertedValue, + convertedValue == null); + break; + case DOUBLE: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (double) convertedValue, + convertedValue == null); + break; + case TEXT: + case STRING: + case BLOB: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (Binary) convertedValue, + convertedValue == null); + break; + default: + throw new IOException("Unsupported data type: " + newType); + } + } + chunkWriter.sealCurrentPage(); + } + chunkWriter.sealCurrentPage(); + ByteBuffer newChunkData = chunkWriter.getByteBuffer(); + ChunkHeader newChunkHeader = + new ChunkHeader( + chunkHeader.getMeasurementID(), + newChunkData.capacity(), + newType, + chunkHeader.getCompressionType(), + chunkHeader.getEncodingType(), + chunkWriter.getNumOfPages(), + TsFileConstant.VALUE_COLUMN_MASK); + chunkData.flip(); + timeChunk.chunkData.flip(); + return new Chunk( + newChunkHeader, + newChunkData, + deleteIntervalList, + chunkWriter.getStatistics(), + encryptParam); + } + ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema, encryptParam); + ChunkReader chunkReader = new ChunkReader(this); + List<IPageReader> pages = chunkReader.loadPageReaderList(); + for (IPageReader page : pages) { + BatchData batchData = page.getAllSatisfiedPageData(); + IPointReader pointReader = batchData.getBatchDataIterator(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + Object convertedValue = + newType.castFromSingleValue(chunkHeader.getDataType(), point.getValue().getValue()); + long timestamp = point.getTimestamp(); + if (convertedValue == null) { + continue; + } Review Comment: NonAlignedChunk should not contain any null. You may add an error log here just in case. ########## java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java: ########## @@ -200,4 +214,164 @@ public Statistics getChunkStatistic() { public long getRetainedSizeInBytes() { return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity()); } + + public Chunk rewrite(TSDataType newType, boolean isValueChunk, Chunk timeChunk) + throws IOException { + if (newType == null || newType == chunkHeader.getDataType()) { + return this; + } + IMeasurementSchema schema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), newType, chunkHeader.getEncodingType()); + if (isValueChunk) { + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + chunkHeader.getMeasurementID(), + chunkHeader.getCompressionType(), + newType, + chunkHeader.getEncodingType(), + schema.getValueEncoder(), + encryptParam); + List<Chunk> valueChunks = new ArrayList<>(); + valueChunks.add(this); + AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> pages = chunkReader.loadPageReaderList(); + for (IPageReader page : pages) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + Object convertedValue = null; + if (point.getValue().getVector()[0] != null) { + convertedValue = + newType.castFromSingleValue( + chunkHeader.getDataType(), point.getValue().getVector()[0].getValue()); + } + long timestamp = point.getTimestamp(); + switch (newType) { + case BOOLEAN: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (boolean) convertedValue, + convertedValue == null); + break; + case DATE: + case INT32: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (int) convertedValue, + convertedValue == null); + break; + case TIMESTAMP: + case INT64: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (long) convertedValue, + convertedValue == null); + break; + case FLOAT: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (float) convertedValue, + convertedValue == null); + break; + case DOUBLE: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (double) convertedValue, + convertedValue == null); + break; + case TEXT: + case STRING: + case BLOB: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (Binary) convertedValue, + convertedValue == null); + break; + default: + throw new IOException("Unsupported data type: " + newType); + } + } + chunkWriter.sealCurrentPage(); + } + chunkWriter.sealCurrentPage(); + ByteBuffer newChunkData = chunkWriter.getByteBuffer(); + ChunkHeader newChunkHeader = + new ChunkHeader( + chunkHeader.getMeasurementID(), + newChunkData.capacity(), + newType, + chunkHeader.getCompressionType(), + chunkHeader.getEncodingType(), + chunkWriter.getNumOfPages(), + TsFileConstant.VALUE_COLUMN_MASK); + chunkData.flip(); + timeChunk.chunkData.flip(); + return new Chunk( + newChunkHeader, + newChunkData, + deleteIntervalList, + chunkWriter.getStatistics(), + encryptParam); + } + ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema, encryptParam); + ChunkReader chunkReader = new ChunkReader(this); + List<IPageReader> pages = chunkReader.loadPageReaderList(); + for (IPageReader page : pages) { + BatchData batchData = page.getAllSatisfiedPageData(); + IPointReader pointReader = batchData.getBatchDataIterator(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + Object convertedValue = + newType.castFromSingleValue(chunkHeader.getDataType(), point.getValue().getValue()); + long timestamp = point.getTimestamp(); + if (convertedValue == null) { + continue; + } Review Comment: NonAlignedChunk should not contain any null. You may add an error log here, just in case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@tsfile.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org