jt2594838 commented on code in PR #12476:
URL: https://github.com/apache/iotdb/pull/12476#discussion_r1636207336
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java:
##########
@@ -190,33 +204,43 @@ private void loadNextSegmentV1() throws IOException {
}
private void loadNextSegmentV2() throws IOException {
- headerBuffer.clear();
- if (channel.read(headerBuffer) != Integer.BYTES + 1) {
- throw new IOException("Unexpected end of file");
- }
- // compressionType originalSize compressedSize
- headerBuffer.flip();
- CompressionType compressionType =
CompressionType.deserialize(headerBuffer.get());
- int dataBufferSize = headerBuffer.getInt();
- if (compressionType != CompressionType.UNCOMPRESSED) {
- compressedHeader.clear();
- if (channel.read(compressedHeader) != Integer.BYTES) {
- throw new IOException("Unexpected end of file");
+ SegmentInfo segmentInfo = getNextSegmentInfo();
+ if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) {
+ // A compressed segment
+ if (Objects.isNull(dataBuffer)
+ || dataBuffer.capacity() < segmentInfo.uncompressedSize
+ || dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) {
+ dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
Review Comment:
If the previous buffer is not null, it should be manually released through
`MmapUtil.clean`—the same for the below.
##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java:
##########
@@ -18,37 +18,392 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.compression;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALTestUtils;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALBuffer;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALSignalEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.compress.ICompressor;
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.PublicBAOS;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
+import org.junit.Test;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
public class WALCompressionTest {
private final File walFile =
new File(
TestConstant.BASE_OUTPUT_PATH.concat(
WALFileUtils.getLogFileName(0, 0,
WALFileStatus.CONTAINS_SEARCH_INDEX)));
+ private final String compressionDir =
+ TestConstant.OUTPUT_DATA_DIR.concat(File.separator + "wal-compression");
+
+ private final String devicePath = "root.sg.d1";
+ long originalMinCompressionSize;
+ CompressionType originCompressionType =
+ IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
+
@Before
- public void setUp() throws IOException {
- if (walFile.getParentFile().exists()) {
- Files.delete(walFile.getParentFile().toPath());
+ public void setUp()
+ throws IOException, NoSuchFieldException, ClassNotFoundException,
IllegalAccessException {
+ if (walFile.exists()) {
+ FileUtils.delete(walFile);
+ }
+ originalMinCompressionSize = WALTestUtils.getMinCompressionSize();
+ if (new File(compressionDir).exists()) {
+ FileUtils.forceDelete(new File(compressionDir));
}
- Files.createDirectory(walFile.getParentFile().toPath());
}
@After
- public void tearDown() throws IOException {
- if (walFile.getParentFile().exists()) {
- Files.delete(walFile.getParentFile().toPath());
+ public void tearDown()
+ throws IOException, NoSuchFieldException, ClassNotFoundException,
IllegalAccessException {
+ if (walFile.exists()) {
+ FileUtils.delete(walFile);
+ }
+ if (new File(compressionDir).exists()) {
+ FileUtils.forceDelete(new File(compressionDir));
+ }
+ WALTestUtils.setMinCompressionSize(originalMinCompressionSize);
+
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(originCompressionType);
+ }
+
+ @Test
+ public void testSkipToGivenPositionWithCompression()
+ throws NoSuchFieldException,
+ ClassNotFoundException,
+ IllegalAccessException,
+ QueryProcessException,
+ IllegalPathException,
+ IOException {
+ WALTestUtils.setMinCompressionSize(0L);
+
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
+ testSkipToGivenPosition();
+ }
+
+ @Test
+ public void testSkipToGivenPositionWithoutCompression()
+ throws NoSuchFieldException,
+ ClassNotFoundException,
+ IllegalAccessException,
+ QueryProcessException,
+ IllegalPathException,
+ IOException {
+ WALTestUtils.setMinCompressionSize(1024 * 32);
+ testSkipToGivenPosition();
+ }
+
+ public void testSkipToGivenPosition()
+ throws QueryProcessException, IllegalPathException, IOException {
+ LogWriter writer = new WALWriter(walFile);
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 4);
+ List<Pair<Long, InsertRowNode>> positionAndEntryPairList = new
ArrayList<>();
+ int memTableId = 0;
+ long fileOffset = 0;
+ for (int i = 0; i < 100; ) {
+ InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath +
memTableId, i);
+ long serializedSize = insertRowNode.serializedSize();
+ if (buffer.remaining() >= serializedSize) {
+ int pos = buffer.position();
+ insertRowNode.serialize(buffer);
+ positionAndEntryPairList.add(new Pair<>(fileOffset, insertRowNode));
+ fileOffset += buffer.position() - pos;
+ i++;
+ } else {
+ writer.write(buffer);
+ buffer.clear();
+ }
+ }
+ if (buffer.position() != 0) {
+ writer.write(buffer);
+ }
+ writer.close();
+ try (WALInputStream stream = new WALInputStream(walFile)) {
+ for (int i = 0; i < 100; ++i) {
+ Pair<Long, InsertRowNode> positionAndNodePair =
positionAndEntryPairList.get(i);
+ stream.skipToGivenPosition(positionAndNodePair.left);
+ /*
+ Add the allocated buffer size by 2, because the actual serialized
size
+ of InsertRowNode is larger than the estimated value got by
serializedSize.
+ I don't know if this is a bug or not.
+ */
+ ByteBuffer nodeBuffer1 =
+ ByteBuffer.allocate(positionAndNodePair.right.serializedSize() +
2);
+ stream.read(nodeBuffer1);
+ ByteBuffer nodeBuffer2 =
+ ByteBuffer.allocate(positionAndNodePair.right.serializedSize() +
2);
+ positionAndNodePair.right.serialize(nodeBuffer2);
+ nodeBuffer2.flip();
+ Assert.assertArrayEquals(nodeBuffer1.array(), nodeBuffer2.array());
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedWALStructure()
+ throws QueryProcessException, IllegalPathException, IOException {
+ PublicBAOS baos = new PublicBAOS();
+ DataOutputStream dataOutputStream = new DataOutputStream(baos);
+ List<InsertRowNode> insertRowNodes = new ArrayList<>();
+ for (int i = 0; i < 100; ++i) {
+ InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+ insertRowNodes.add(node);
+ node.serialize(dataOutputStream);
+ }
+ dataOutputStream.close();
+ ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+ // Do not compress it
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setWALCompressionAlgorithm(CompressionType.UNCOMPRESSED);
+ try (WALWriter writer = new WALWriter(walFile)) {
+ buf.position(buf.limit());
+ writer.write(buf);
+ }
+
+ try (DataInputStream dataInputStream =
+ new DataInputStream(new
BufferedInputStream(Files.newInputStream(walFile.toPath())))) {
+ byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_BYTES];
+ // head magic string
+ dataInputStream.readFully(magicStringBytes);
+ Assert.assertEquals(WALWriter.MAGIC_STRING, new
String(magicStringBytes));
+ Assert.assertEquals(
+ CompressionType.UNCOMPRESSED,
CompressionType.deserialize(dataInputStream.readByte()));
+ Assert.assertEquals(buf.array().length, dataInputStream.readInt());
+ ByteBuffer dataBuf = ByteBuffer.allocate(buf.array().length);
+ dataInputStream.readFully(dataBuf.array());
+ Assert.assertArrayEquals(buf.array(), dataBuf.array());
+ Assert.assertEquals(
+ new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER),
+ WALEntry.deserialize(dataInputStream));
+ ByteBuffer metadataBuf = ByteBuffer.allocate(12 + Integer.BYTES);
+ dataInputStream.readFully(metadataBuf.array());
+ // Tail magic string
+ dataInputStream.readFully(magicStringBytes);
+ Assert.assertEquals(WALWriter.MAGIC_STRING, new
String(magicStringBytes));
+ }
+ }
+
+ @Test
+ public void testCompressedWALStructure()
+ throws IOException,
+ QueryProcessException,
+ IllegalPathException,
+ NoSuchFieldException,
+ ClassNotFoundException,
+ IllegalAccessException {
+ PublicBAOS baos = new PublicBAOS();
+ DataOutputStream dataOutputStream = new DataOutputStream(baos);
+ List<InsertRowNode> insertRowNodes = new ArrayList<>();
+ for (int i = 0; i < 100; ++i) {
+ InsertRowNode node = WALTestUtils.getInsertRowNode(devicePath, i);
+ insertRowNodes.add(node);
+ node.serialize(dataOutputStream);
+ }
+ dataOutputStream.close();
+ ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+ // Do not compress it
+
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
Review Comment:
Do not compress it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]