jt2594838 commented on code in PR #415: URL: https://github.com/apache/tsfile/pull/415#discussion_r1970773601
########## java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java: ########## @@ -200,4 +214,164 @@ public Statistics getChunkStatistic() { public long getRetainedSizeInBytes() { return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity()); } + + public Chunk rewrite(TSDataType newType, boolean isValueChunk, Chunk timeChunk) + throws IOException { + if (newType == null || newType == chunkHeader.getDataType()) { + return this; + } + IMeasurementSchema schema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), newType, chunkHeader.getEncodingType()); + if (isValueChunk) { + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + chunkHeader.getMeasurementID(), + chunkHeader.getCompressionType(), + newType, + chunkHeader.getEncodingType(), + schema.getValueEncoder(), + encryptParam); + List<Chunk> valueChunks = new ArrayList<>(); + valueChunks.add(this); + AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> pages = chunkReader.loadPageReaderList(); + for (IPageReader page : pages) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + Object convertedValue = null; + if (point.getValue().getVector()[0] != null) { + convertedValue = + newType.castFromSingleValue( + chunkHeader.getDataType(), point.getValue().getVector()[0].getValue()); + } + long timestamp = point.getTimestamp(); + switch (newType) { + case BOOLEAN: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (boolean) convertedValue, + convertedValue == null); + break; + case DATE: + case INT32: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (int) convertedValue, + convertedValue == null); + break; + case TIMESTAMP: + case INT64: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (long) convertedValue, + convertedValue == null); + break; + case FLOAT: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (float) convertedValue, + convertedValue == null); + break; + case DOUBLE: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (double) convertedValue, + convertedValue == null); + break; + case TEXT: + case STRING: + case BLOB: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (Binary) convertedValue, + convertedValue == null); + break; + default: + throw new IOException("Unsupported data type: " + newType); + } + } + chunkWriter.sealCurrentPage(); + } + chunkWriter.sealCurrentPage(); + ByteBuffer newChunkData = chunkWriter.getByteBuffer(); + ChunkHeader newChunkHeader = + new ChunkHeader( + chunkHeader.getMeasurementID(), + newChunkData.capacity(), + newType, + chunkHeader.getCompressionType(), + chunkHeader.getEncodingType(), + chunkWriter.getNumOfPages(), + TsFileConstant.VALUE_COLUMN_MASK); Review Comment: chunkType should inherit from this one. ########## java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tsfile.write; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.common.BatchData; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.reader.IPageReader; +import org.apache.tsfile.read.reader.IPointReader; +import org.apache.tsfile.read.reader.chunk.AlignedChunkReader; +import org.apache.tsfile.read.reader.chunk.ChunkReader; +import org.apache.tsfile.read.reader.page.AlignedPageReader; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.tsfile.write.chunk.TimeChunkWriter; +import org.apache.tsfile.write.chunk.ValueChunkWriter; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.VectorMeasurementSchema; + +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class ChunkRewriteTest { + + @Test + public void AlignedChunkTest() throws IOException { + String[] measurements = new String[] {"s1", "s2", "s3"}; + TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE}; + VectorMeasurementSchema measurementSchema = + new VectorMeasurementSchema("root.sg.d1", measurements, types); + AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchema); + + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + + TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter(); + List<ValueChunkWriter> valueChunkWriters = chunkWriter.getValueChunkWriterList(); + + ByteBuffer newChunkData = timeChunkWriter.getByteBuffer(); + ChunkHeader newChunkHeader = + new ChunkHeader( + measurementSchema.getMeasurementName(), + newChunkData.capacity(), + TSDataType.VECTOR, + measurementSchema.getCompressor(), + measurementSchema.getTimeTSEncoding(), + timeChunkWriter.getNumOfPages()); + Chunk timeChunk = + new Chunk(newChunkHeader, newChunkData, null, timeChunkWriter.getStatistics()); + + List<Chunk> valueChunks = new ArrayList<>(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriters) { + ByteBuffer valueChunkData = valueChunkWriter.getByteBuffer(); + ChunkHeader valueChunkHeader = + new ChunkHeader( + valueChunkWriter.getMeasurementId(), + valueChunkData.capacity(), + valueChunkWriter.getDataType(), + valueChunkWriter.getCompressionType(), + valueChunkWriter.getEncodingType(), + valueChunkWriter.getNumOfPages()); + Chunk valueChunk = + new Chunk(valueChunkHeader, valueChunkData, null, valueChunkWriter.getStatistics()); + valueChunks.add(valueChunk); + } + AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> pageReaders = chunkReader.loadPageReaderList(); + for (IPageReader page : pageReaders) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + int i = 1; + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((float) i, point.getValue().getVector()[0].getValue()); + assertEquals(i, point.getValue().getVector()[1].getValue()); + assertEquals((double) i, point.getValue().getVector()[2].getValue()); + i++; + } + } + timeChunk.getData().flip(); + valueChunks.get(0).getData().flip(); + valueChunks.get(1).getData().flip(); + valueChunks.get(2).getData().flip(); + // rewrite INT32->DOUBLE + Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, true, timeChunk); + valueChunks.set(1, newValueChunk); + AlignedChunkReader newChunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList(); + for (IPageReader page : newPageReaders) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + int i = 1; + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((float) i, point.getValue().getVector()[0].getValue()); + assertEquals((double) i, point.getValue().getVector()[1].getValue()); + assertEquals((double) i, point.getValue().getVector()[2].getValue()); + i++; + } + } Review Comment: Check the number of points. ########## java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java: ########## @@ -200,4 +214,164 @@ public Statistics getChunkStatistic() { public long getRetainedSizeInBytes() { return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity()); } + + public Chunk rewrite(TSDataType newType, boolean isValueChunk, Chunk timeChunk) + throws IOException { + if (newType == null || newType == chunkHeader.getDataType()) { + return this; + } + IMeasurementSchema schema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), newType, chunkHeader.getEncodingType()); + if (isValueChunk) { Review Comment: Better to split into two methods. ########## java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java: ########## @@ -200,4 +214,164 @@ public Statistics getChunkStatistic() { public long getRetainedSizeInBytes() { return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity()); } + + public Chunk rewrite(TSDataType newType, boolean isValueChunk, Chunk timeChunk) + throws IOException { + if (newType == null || newType == chunkHeader.getDataType()) { + return this; + } + IMeasurementSchema schema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), newType, chunkHeader.getEncodingType()); + if (isValueChunk) { + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + chunkHeader.getMeasurementID(), + chunkHeader.getCompressionType(), + newType, + chunkHeader.getEncodingType(), + schema.getValueEncoder(), + encryptParam); + List<Chunk> valueChunks = new ArrayList<>(); + valueChunks.add(this); + AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks); Review Comment: Also, since AlignedChunkReader will ignore timestamps that all values are null, please use TableChunkReader, which will not. Otherwise, the new value chunk will be unable to align with the time chunk because of missing timestamps. ########## java/tsfile/src/main/java/org/apache/tsfile/file/metadata/AbstractAlignedChunkMetadata.java: ########## @@ -160,6 +160,17 @@ public TSDataType getDataType() { return timeChunkMetadata.getDataType(); } + @Override + public TSDataType getNewType() { + return timeChunkMetadata.getNewType(); + } Review Comment: Also throw exception here. ########## java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java: ########## @@ -200,4 +214,164 @@ public Statistics getChunkStatistic() { public long getRetainedSizeInBytes() { return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity()); } + + public Chunk rewrite(TSDataType newType, boolean isValueChunk, Chunk timeChunk) + throws IOException { + if (newType == null || newType == chunkHeader.getDataType()) { + return this; + } + IMeasurementSchema schema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), newType, chunkHeader.getEncodingType()); + if (isValueChunk) { + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + chunkHeader.getMeasurementID(), + chunkHeader.getCompressionType(), + newType, + chunkHeader.getEncodingType(), + schema.getValueEncoder(), + encryptParam); + List<Chunk> valueChunks = new ArrayList<>(); + valueChunks.add(this); + AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks); Review Comment: ```suggestion AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, Collections.singletonList(this)); ``` ########## java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java: ########## @@ -527,4 +527,21 @@ public void setLastPoint(boolean isLastPoint) { public PageWriter getPageWriter() { return pageWriter; } + + public int getNumOfPages() { + return numOfPages; + } + + public ByteBuffer getByteBuffer() { + ByteBuffer copy = ByteBuffer.allocate(pageBuffer.size()); + copy.put(pageBuffer.toByteArray()); + copy.flip(); + return copy; Review Comment: ```suggestion return ByteBuffer.wrap(pageBuffer.toByteArray()); ``` ########## java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java: ########## @@ -200,4 +214,164 @@ public Statistics getChunkStatistic() { public long getRetainedSizeInBytes() { return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity()); } + + public Chunk rewrite(TSDataType newType, boolean isValueChunk, Chunk timeChunk) + throws IOException { + if (newType == null || newType == chunkHeader.getDataType()) { + return this; + } + IMeasurementSchema schema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), newType, chunkHeader.getEncodingType()); Review Comment: Do not forget compression. ########## java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java: ########## @@ -200,4 +214,164 @@ public Statistics getChunkStatistic() { public long getRetainedSizeInBytes() { return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity()); } + + public Chunk rewrite(TSDataType newType, boolean isValueChunk, Chunk timeChunk) + throws IOException { + if (newType == null || newType == chunkHeader.getDataType()) { + return this; + } + IMeasurementSchema schema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), newType, chunkHeader.getEncodingType()); + if (isValueChunk) { + ValueChunkWriter chunkWriter = + new ValueChunkWriter( + chunkHeader.getMeasurementID(), + chunkHeader.getCompressionType(), + newType, + chunkHeader.getEncodingType(), + schema.getValueEncoder(), + encryptParam); + List<Chunk> valueChunks = new ArrayList<>(); + valueChunks.add(this); + AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> pages = chunkReader.loadPageReaderList(); + for (IPageReader page : pages) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + Object convertedValue = null; + if (point.getValue().getVector()[0] != null) { + convertedValue = + newType.castFromSingleValue( + chunkHeader.getDataType(), point.getValue().getVector()[0].getValue()); + } + long timestamp = point.getTimestamp(); + switch (newType) { + case BOOLEAN: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (boolean) convertedValue, + convertedValue == null); + break; + case DATE: + case INT32: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (int) convertedValue, + convertedValue == null); + break; + case TIMESTAMP: + case INT64: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (long) convertedValue, + convertedValue == null); + break; + case FLOAT: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (float) convertedValue, + convertedValue == null); + break; + case DOUBLE: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (double) convertedValue, + convertedValue == null); + break; + case TEXT: + case STRING: + case BLOB: + chunkWriter.write( + timestamp, + convertedValue == null ? null : (Binary) convertedValue, + convertedValue == null); + break; + default: + throw new IOException("Unsupported data type: " + newType); + } + } + chunkWriter.sealCurrentPage(); + } + chunkWriter.sealCurrentPage(); + ByteBuffer newChunkData = chunkWriter.getByteBuffer(); + ChunkHeader newChunkHeader = + new ChunkHeader( + chunkHeader.getMeasurementID(), + newChunkData.capacity(), + newType, + chunkHeader.getCompressionType(), + chunkHeader.getEncodingType(), + chunkWriter.getNumOfPages(), + TsFileConstant.VALUE_COLUMN_MASK); + chunkData.flip(); + timeChunk.chunkData.flip(); + return new Chunk( + newChunkHeader, + newChunkData, + deleteIntervalList, + chunkWriter.getStatistics(), + encryptParam); + } + ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema, encryptParam); + ChunkReader chunkReader = new ChunkReader(this); + List<IPageReader> pages = chunkReader.loadPageReaderList(); + for (IPageReader page : pages) { + BatchData batchData = page.getAllSatisfiedPageData(); + IPointReader pointReader = batchData.getBatchDataIterator(); + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + Object convertedValue = + newType.castFromSingleValue(chunkHeader.getDataType(), point.getValue().getValue()); + long timestamp = point.getTimestamp(); + if (convertedValue == null) { + continue; + } Review Comment: In what case convertedValue will be a null? ########## java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tsfile.write; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.common.BatchData; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.reader.IPageReader; +import org.apache.tsfile.read.reader.IPointReader; +import org.apache.tsfile.read.reader.chunk.AlignedChunkReader; +import org.apache.tsfile.read.reader.chunk.ChunkReader; +import org.apache.tsfile.read.reader.page.AlignedPageReader; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.tsfile.write.chunk.TimeChunkWriter; +import org.apache.tsfile.write.chunk.ValueChunkWriter; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.VectorMeasurementSchema; + +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class ChunkRewriteTest { + + @Test + public void AlignedChunkTest() throws IOException { + String[] measurements = new String[] {"s1", "s2", "s3"}; + TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, TSDataType.DOUBLE}; + VectorMeasurementSchema measurementSchema = + new VectorMeasurementSchema("root.sg.d1", measurements, types); + AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(measurementSchema); + + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, (float) time, false); + chunkWriter.write(time, time, false); + chunkWriter.write(time, (double) time, false); + chunkWriter.write(time); + } + chunkWriter.sealCurrentPage(); + + TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter(); + List<ValueChunkWriter> valueChunkWriters = chunkWriter.getValueChunkWriterList(); + + ByteBuffer newChunkData = timeChunkWriter.getByteBuffer(); + ChunkHeader newChunkHeader = + new ChunkHeader( + measurementSchema.getMeasurementName(), + newChunkData.capacity(), + TSDataType.VECTOR, + measurementSchema.getCompressor(), + measurementSchema.getTimeTSEncoding(), + timeChunkWriter.getNumOfPages()); + Chunk timeChunk = + new Chunk(newChunkHeader, newChunkData, null, timeChunkWriter.getStatistics()); + + List<Chunk> valueChunks = new ArrayList<>(); + for (ValueChunkWriter valueChunkWriter : valueChunkWriters) { + ByteBuffer valueChunkData = valueChunkWriter.getByteBuffer(); + ChunkHeader valueChunkHeader = + new ChunkHeader( + valueChunkWriter.getMeasurementId(), + valueChunkData.capacity(), + valueChunkWriter.getDataType(), + valueChunkWriter.getCompressionType(), + valueChunkWriter.getEncodingType(), + valueChunkWriter.getNumOfPages()); + Chunk valueChunk = + new Chunk(valueChunkHeader, valueChunkData, null, valueChunkWriter.getStatistics()); + valueChunks.add(valueChunk); + } + AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> pageReaders = chunkReader.loadPageReaderList(); + for (IPageReader page : pageReaders) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + int i = 1; + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((float) i, point.getValue().getVector()[0].getValue()); + assertEquals(i, point.getValue().getVector()[1].getValue()); + assertEquals((double) i, point.getValue().getVector()[2].getValue()); + i++; + } + } + timeChunk.getData().flip(); + valueChunks.get(0).getData().flip(); + valueChunks.get(1).getData().flip(); + valueChunks.get(2).getData().flip(); + // rewrite INT32->DOUBLE + Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, true, timeChunk); + valueChunks.set(1, newValueChunk); + AlignedChunkReader newChunkReader = new AlignedChunkReader(timeChunk, valueChunks); + List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList(); + for (IPageReader page : newPageReaders) { + IPointReader pointReader = ((AlignedPageReader) page).getLazyPointReader(); + int i = 1; + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((float) i, point.getValue().getVector()[0].getValue()); + assertEquals((double) i, point.getValue().getVector()[1].getValue()); + assertEquals((double) i, point.getValue().getVector()[2].getValue()); + i++; + } + } + timeChunk.getData().flip(); + valueChunks.get(0).getData().flip(); + valueChunks.get(1).getData().flip(); + valueChunks.get(2).getData().flip(); + + // + + } + + @Test + public void NonAlignedChunkTest() throws IOException { + IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.PLAIN); + ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema); + for (int time = 1; time <= 20; time++) { + chunkWriter.write(time, time); + } + chunkWriter.sealCurrentPage(); + ByteBuffer newChunkData = chunkWriter.getByteBuffer(); + ChunkHeader newChunkHeader = + new ChunkHeader( + schema.getMeasurementName(), + newChunkData.capacity(), + schema.getType(), + schema.getCompressor(), + schema.getEncodingType(), + chunkWriter.getNumOfPages()); + Chunk newChunk = new Chunk(newChunkHeader, newChunkData, null, chunkWriter.getStatistics()); + ChunkReader chunkReader = new ChunkReader(newChunk); + List<IPageReader> pageReaders = chunkReader.loadPageReaderList(); + for (IPageReader page : pageReaders) { + BatchData data = page.getAllSatisfiedPageData(true); + IPointReader pointReader = data.getBatchDataIterator(); + int i = 1; + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals(i, point.getValue().getValue()); + i++; + } + } + newChunk.getData().flip(); + // rewrite FLOAT->DOUBLE + Chunk newChunk2 = newChunk.rewrite(TSDataType.DOUBLE, false, null); + ChunkReader chunkReader2 = new ChunkReader(newChunk2); + List<IPageReader> pageReaders2 = chunkReader2.loadPageReaderList(); + for (IPageReader page : pageReaders2) { + BatchData data = page.getAllSatisfiedPageData(true); + IPointReader pointReader = data.getBatchDataIterator(); + int i = 1; + while (pointReader.hasNextTimeValuePair()) { + TimeValuePair point = pointReader.nextTimeValuePair(); + assertEquals((long) i, point.getTimestamp()); + assertEquals((double) i, point.getValue().getValue()); + i++; + } + } + } +} Review Comment: Add three tests: 1. aligned with more than one page; 2. non-aligned with more than one page; 3. aligned with some nulls. ########## java/tsfile/src/main/java/org/apache/tsfile/write/chunk/ChunkWriterImpl.java: ########## @@ -527,4 +527,21 @@ public void setLastPoint(boolean isLastPoint) { public PageWriter getPageWriter() { return pageWriter; } + + public int getNumOfPages() { + return numOfPages; + } + + public ByteBuffer getByteBuffer() { + ByteBuffer copy = ByteBuffer.allocate(pageBuffer.size()); + copy.put(pageBuffer.toByteArray()); + copy.flip(); + return copy; Review Comment: The same for other places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@tsfile.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org