jt2594838 commented on code in PR #12476:
URL: https://github.com/apache/iotdb/pull/12476#discussion_r1621686309
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java:
##########
@@ -43,23 +45,75 @@ public abstract class LogWriter implements ILogWriter {
protected final File logFile;
protected final FileOutputStream logStream;
protected final FileChannel logChannel;
- protected long size;
+ protected long size = 0;
+ protected long originalSize = 0;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES *
2 + 1);
+ private static final CompressionType compressionAlg =
+ IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
+ private final ICompressor compressor =
ICompressor.getCompressor(compressionAlg);
+ private final ByteBuffer compressedByteBuffer;
+ // Minimum size to compress, default is 32 KB
+ private static final long MIN_COMPRESS_SIZE = 0;
Review Comment:
It does not look like 32KB.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java:
##########
@@ -64,8 +67,12 @@ public boolean hasNext() {
public ByteBuffer next() throws IOException {
int size = sizeIterator.next();
ByteBuffer buffer = ByteBuffer.allocate(size);
Review Comment:
Maybe the buffer can be reused (e.g., make it a parameter of the function
and use it if its size fits) to reduce memory allocation.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java:
##########
@@ -75,6 +76,9 @@ protected IWALNode createWALNode(String identifier, String
folder) {
} catch (FileNotFoundException e) {
logger.error("Fail to create wal node", e);
return WALFakeNode.getFailureInstance(e);
+ } catch (IOException e) {
+ logger.error("Meet exception when creating wal node", e);
+ return WALFakeNode.getFailureInstance(e);
}
Review Comment:
FileNotFoundException is also an IOException, and the two branches do not
seem so different to me.
Maybe you can merge them into one.
The same for below.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(WALInputStream.class);
+ private final FileChannel channel;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES +
1);
+ private final ByteBuffer compressedHeader =
ByteBuffer.allocate(Integer.BYTES);
+ private ByteBuffer dataBuffer = null;
+ private long fileSize;
+ File logFile;
+ private long endOffset = -1;
+
+ enum FileVersion {
+ V1,
+ V2,
+ UNKNOWN
+ };
+
+ FileVersion version;
+
+ public WALInputStream(File logFile) throws IOException {
+ channel = FileChannel.open(logFile.toPath());
+ fileSize = channel.size();
+ analyzeFileVersion();
+ getEndOffset();
+ this.logFile = logFile;
+ }
+
+ private void getEndOffset() throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES + Integer.BYTES) {
+ endOffset = channel.size();
+ return;
+ }
+ ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+ long position;
+ try {
+ if (version == FileVersion.V2) {
+ ByteBuffer magicStringBuffer =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
+ magicStringBuffer.flip();
+ if (!new
String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING)) {
+ // this is a broken wal file
+ endOffset = channel.size();
+ return;
+ }
+ position = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES;
+ } else {
+ ByteBuffer magicStringBuffer =
+ ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1.getBytes().length);
+ channel.read(
+ magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_V1.getBytes().length);
+ magicStringBuffer.flip();
+ if (!new
String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING_V1)) {
+ // this is a broken wal file
+ endOffset = channel.size();
+ return;
+ }
+ position = channel.size() -
WALWriter.MAGIC_STRING_V1.getBytes().length - Integer.BYTES;
+ }
+ channel.read(metadataSizeBuf, position);
+ metadataSizeBuf.flip();
+ int metadataSize = metadataSizeBuf.getInt();
+ endOffset = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES - metadataSize - 1;
+ } finally {
+ channel.position(WALWriter.MAGIC_STRING_BYTES);
+ }
+ }
+
+ private void analyzeFileVersion() throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES) {
+ version = FileVersion.UNKNOWN;
+ return;
+ }
+ if (isCurrentVersion()) {
+ this.version = FileVersion.V2;
+ return;
+ }
+ this.version = FileVersion.V1;
+ }
+
+ private boolean isCurrentVersion() throws IOException {
+ channel.position(0);
+ ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(buffer);
+ return new String(buffer.array()).equals(WALWriter.MAGIC_STRING);
Review Comment:
Better to provide string encoding explicitly in all places.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(WALInputStream.class);
+ private final FileChannel channel;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES +
1);
+ private final ByteBuffer compressedHeader =
ByteBuffer.allocate(Integer.BYTES);
+ private ByteBuffer dataBuffer = null;
+ private long fileSize;
+ File logFile;
+ private long endOffset = -1;
+
+ enum FileVersion {
+ V1,
+ V2,
+ UNKNOWN
+ };
+
+ FileVersion version;
+
+ public WALInputStream(File logFile) throws IOException {
+ channel = FileChannel.open(logFile.toPath());
+ fileSize = channel.size();
+ analyzeFileVersion();
+ getEndOffset();
+ this.logFile = logFile;
+ }
+
+ private void getEndOffset() throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES + Integer.BYTES) {
+ endOffset = channel.size();
+ return;
+ }
+ ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+ long position;
+ try {
+ if (version == FileVersion.V2) {
+ ByteBuffer magicStringBuffer =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
+ magicStringBuffer.flip();
+ if (!new
String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING)) {
+ // this is a broken wal file
+ endOffset = channel.size();
+ return;
+ }
+ position = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES;
+ } else {
+ ByteBuffer magicStringBuffer =
+ ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1.getBytes().length);
+ channel.read(
+ magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_V1.getBytes().length);
+ magicStringBuffer.flip();
+ if (!new
String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING_V1)) {
+ // this is a broken wal file
+ endOffset = channel.size();
+ return;
+ }
+ position = channel.size() -
WALWriter.MAGIC_STRING_V1.getBytes().length - Integer.BYTES;
+ }
+ channel.read(metadataSizeBuf, position);
+ metadataSizeBuf.flip();
+ int metadataSize = metadataSizeBuf.getInt();
+ endOffset = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES - metadataSize - 1;
+ } finally {
+ channel.position(WALWriter.MAGIC_STRING_BYTES);
+ }
+ }
+
+ private void analyzeFileVersion() throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES) {
+ version = FileVersion.UNKNOWN;
+ return;
+ }
+ if (isCurrentVersion()) {
+ this.version = FileVersion.V2;
+ return;
+ }
+ this.version = FileVersion.V1;
+ }
+
+ private boolean isCurrentVersion() throws IOException {
+ channel.position(0);
+ ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(buffer);
+ return new String(buffer.array()).equals(WALWriter.MAGIC_STRING);
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (Objects.isNull(dataBuffer) || dataBuffer.position() >=
dataBuffer.limit()) {
+ loadNextSegment();
+ }
+ return dataBuffer.get() & 0xFF;
Review Comment:
What is this for?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java:
##########
@@ -345,12 +344,13 @@ public long getFirstValidWALVersionId() {
}
/** Update wal disk cost of active memTables. */
- public void updateCostOfActiveMemTables(Map<Long, Long>
memTableId2WalDiskUsage) {
+ public void updateCostOfActiveMemTables(
+ Map<Long, Long> memTableId2WalDiskUsage, double compressionRate) {
for (Map.Entry<Long, Long> memTableWalUsage :
memTableId2WalDiskUsage.entrySet()) {
memTableId2Info.computeIfPresent(
memTableWalUsage.getKey(),
(k, v) -> {
- v.addWalDiskUsage(memTableWalUsage.getValue());
+ v.addWalDiskUsage((long) (memTableWalUsage.getValue() *
compressionRate));
Review Comment:
Is it challenging to acquire the actual disk usage?
The compression ratio is already calculated based on that, and I wonder why
you calculated it again (and resulted in a not-so-precise value).
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java:
##########
@@ -521,8 +520,9 @@ public void run() {
forceFlag, syncingBuffer.position(), syncingBuffer.capacity(),
usedRatio * 100);
// flush buffer to os
+ double compressionRate = 1.0;
Review Comment:
Beware of the difference between "rate" and "ratio".
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java:
##########
@@ -535,12 +535,13 @@ public void run() {
memTableIdsOfWal
.computeIfAbsent(currentWALFileVersion, memTableIds -> new
HashSet<>())
.addAll(info.metaData.getMemTablesId());
-
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage);
+
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage,
compressionRate);
boolean forceSuccess = false;
// try to roll log writer
if (info.rollWALFileWriterListener != null
- || (forceFlag && currentWALFileWriter.size() >=
config.getWalFileSizeThresholdInByte())) {
+ || (forceFlag
+ && currentWALFileWriter.originalSize() >=
config.getWalFileSizeThresholdInByte())) {
Review Comment:
Using the originalSize to control force may result in small actual IOs if
the compression ratio is high.
I guess this is to control the risk of losing persistence.
It would be better to add a comment explaining why you should choose
originalSize over actualSize here.
Ideally, I would expect some experiments to show using originalSize is
better than actualSize, or the trade-off.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java:
##########
@@ -521,8 +520,9 @@ public void run() {
forceFlag, syncingBuffer.position(), syncingBuffer.capacity(),
usedRatio * 100);
// flush buffer to os
+ double compressionRate = 1.0;
try {
- currentWALFileWriter.write(syncingBuffer, info.metaData);
+ compressionRate = currentWALFileWriter.write(syncingBuffer,
info.metaData);
Review Comment:
Aside from the compression ratio, the compression throughput is another key
metric that would better be recorded.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java:
##########
@@ -43,23 +45,75 @@ public abstract class LogWriter implements ILogWriter {
protected final File logFile;
protected final FileOutputStream logStream;
protected final FileChannel logChannel;
- protected long size;
+ protected long size = 0;
+ protected long originalSize = 0;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES *
2 + 1);
+ private static final CompressionType compressionAlg =
+ IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
+ private final ICompressor compressor =
ICompressor.getCompressor(compressionAlg);
+ private final ByteBuffer compressedByteBuffer;
+ // Minimum size to compress, default is 32 KB
+ private static final long MIN_COMPRESS_SIZE = 0;
- protected LogWriter(File logFile) throws FileNotFoundException {
+ protected LogWriter(File logFile) throws IOException {
this.logFile = logFile;
this.logStream = new FileOutputStream(logFile, true);
this.logChannel = this.logStream.getChannel();
+ if (!logFile.exists() || logFile.length() == 0) {
+
this.logChannel.write(ByteBuffer.wrap(WALWriter.MAGIC_STRING.getBytes()));
+ size += logChannel.position();
+ }
+ if (compressionAlg != CompressionType.UNCOMPRESSED) {
+ compressedByteBuffer =
+ ByteBuffer.allocate(
+ compressor.getMaxBytesForCompression(
+
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
Review Comment:
This is a pessimistic assumption. It is okay, but for the target situation
where the compression algorithm works fine, this may waste some memory because
the compressed buffer should always be much smaller than the original one.
Maybe the utilization of the buffer should be monitored, and we can resize
it according to the utilization.
For example, if the buffer utilization is always under 30% for a long time,
we can resize it to 40% of the current capacity, saving 60% of the memory.
Just leave a TODO here, if you might.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java:
##########
@@ -153,10 +157,20 @@ public static WALMetaData readFromWALFile(File logFile,
FileChannel channel) thr
return metaData;
}
- private static String readTailMagic(FileChannel channel) throws IOException {
+ private static boolean isValidMagicString(FileChannel channel) throws
IOException {
ByteBuffer magicStringBytes =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
channel.read(magicStringBytes, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
magicStringBytes.flip();
- return new String(magicStringBytes.array());
+ String magicString = new String(magicStringBytes.array());
+ return magicString.equals(WALWriter.MAGIC_STRING)
+ || magicString.startsWith(WALWriter.MAGIC_STRING_V1);
Review Comment:
Why startsWith for V1?
##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.compression;
+
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+public class WALCompressionTest {
+ private final File walFile =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ WALFileUtils.getLogFileName(0, 0,
WALFileStatus.CONTAINS_SEARCH_INDEX)));
+
+ @Before
+ public void setUp() throws IOException {
+ if (walFile.getParentFile().exists()) {
+ Files.delete(walFile.getParentFile().toPath());
+ }
+ Files.createDirectory(walFile.getParentFile().toPath());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (walFile.getParentFile().exists()) {
+ Files.delete(walFile.getParentFile().toPath());
+ }
+ }
+
+ public void testSkipToGivenPosition() {}
Review Comment:
PLEASE complete the tests.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(WALInputStream.class);
+ private final FileChannel channel;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES +
1);
+ private final ByteBuffer compressedHeader =
ByteBuffer.allocate(Integer.BYTES);
+ private ByteBuffer dataBuffer = null;
+ private long fileSize;
+ File logFile;
+ private long endOffset = -1;
+
+ enum FileVersion {
+ V1,
+ V2,
+ UNKNOWN
+ };
+
+ FileVersion version;
+
+ public WALInputStream(File logFile) throws IOException {
+ channel = FileChannel.open(logFile.toPath());
+ fileSize = channel.size();
+ analyzeFileVersion();
+ getEndOffset();
+ this.logFile = logFile;
+ }
+
+ private void getEndOffset() throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES + Integer.BYTES) {
+ endOffset = channel.size();
+ return;
+ }
+ ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+ long position;
+ try {
+ if (version == FileVersion.V2) {
+ ByteBuffer magicStringBuffer =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
+ magicStringBuffer.flip();
+ if (!new
String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING)) {
+ // this is a broken wal file
+ endOffset = channel.size();
+ return;
+ }
+ position = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES;
+ } else {
+ ByteBuffer magicStringBuffer =
+ ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1.getBytes().length);
+ channel.read(
+ magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_V1.getBytes().length);
+ magicStringBuffer.flip();
+ if (!new
String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING_V1)) {
+ // this is a broken wal file
+ endOffset = channel.size();
+ return;
+ }
+ position = channel.size() -
WALWriter.MAGIC_STRING_V1.getBytes().length - Integer.BYTES;
+ }
+ channel.read(metadataSizeBuf, position);
+ metadataSizeBuf.flip();
+ int metadataSize = metadataSizeBuf.getInt();
+ endOffset = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES - metadataSize - 1;
+ } finally {
+ channel.position(WALWriter.MAGIC_STRING_BYTES);
+ }
+ }
+
+ private void analyzeFileVersion() throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES) {
+ version = FileVersion.UNKNOWN;
+ return;
+ }
+ if (isCurrentVersion()) {
+ this.version = FileVersion.V2;
+ return;
+ }
+ this.version = FileVersion.V1;
+ }
+
+ private boolean isCurrentVersion() throws IOException {
+ channel.position(0);
+ ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(buffer);
+ return new String(buffer.array()).equals(WALWriter.MAGIC_STRING);
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (Objects.isNull(dataBuffer) || dataBuffer.position() >=
dataBuffer.limit()) {
+ loadNextSegment();
+ }
+ return dataBuffer.get() & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (Objects.isNull(dataBuffer) || dataBuffer.position() >=
dataBuffer.limit()) {
+ loadNextSegment();
+ }
+ if (dataBuffer.remaining() >= len) {
+ dataBuffer.get(b, off, len);
+ return len;
+ }
+ int toBeRead = len;
+ while (toBeRead > 0) {
+ int remaining = dataBuffer.remaining();
+ int bytesRead = Math.min(remaining, toBeRead);
+ dataBuffer.get(b, off, bytesRead);
+ off += bytesRead;
+ toBeRead -= bytesRead;
+ if (toBeRead > 0) {
+ loadNextSegment();
+ }
+ }
+ return len;
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ dataBuffer = null;
+ }
+
+ @Override
+ public int available() throws IOException {
+ long size = (endOffset - channel.position());
+ if (!Objects.isNull(dataBuffer)) {
+ size += dataBuffer.limit() - dataBuffer.position();
+ }
+ return (int) size;
+ }
+
+ private void loadNextSegment() throws IOException {
+ if (channel.position() >= endOffset) {
+ throw new IOException("End of file");
+ }
+ if (version == FileVersion.V2) {
+ loadNextSegmentV2();
+ } else if (version == FileVersion.V1) {
+ loadNextSegmentV1();
+ } else {
+ tryLoadSegment();
+ }
+ }
+
+ private void loadNextSegmentV1() throws IOException {
+ // just read raw data as input
+ if (channel.position() >= fileSize) {
+ throw new IOException("Unexpected end of file");
+ }
+ if (Objects.isNull(dataBuffer)) {
+ // read 128 KB
+ dataBuffer = ByteBuffer.allocate(128 * 1024);
+ }
+ dataBuffer.clear();
+ channel.read(dataBuffer);
+ dataBuffer.flip();
+ }
+
+ private void loadNextSegmentV2() throws IOException {
+ headerBuffer.clear();
+ if (channel.read(headerBuffer) != Integer.BYTES + 1) {
+ throw new IOException("Unexpected end of file");
+ }
+ // compressionType originalSize compressedSize
+ headerBuffer.flip();
+ CompressionType compressionType =
CompressionType.deserialize(headerBuffer.get());
+ int dataBufferSize = headerBuffer.getInt();
+ if (compressionType != CompressionType.UNCOMPRESSED) {
+ compressedHeader.clear();
+ if (channel.read(compressedHeader) != Integer.BYTES) {
+ throw new IOException("Unexpected end of file");
+ }
+ compressedHeader.flip();
+ int uncompressedSize = compressedHeader.getInt();
+ dataBuffer = ByteBuffer.allocateDirect(uncompressedSize);
+ ByteBuffer compressedData = ByteBuffer.allocateDirect(dataBufferSize);
Review Comment:
Make sure the direct buffers are properly cleaned.
If the previous dataBuffer is large enough, use it directly.
The same can be done to compressedData.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.tsfile.compress.IUnCompressor;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(WALInputStream.class);
+ private final FileChannel channel;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES +
1);
+ private final ByteBuffer compressedHeader =
ByteBuffer.allocate(Integer.BYTES);
+ private ByteBuffer dataBuffer = null;
+ private long fileSize;
+ File logFile;
+ private long endOffset = -1;
+
+ enum FileVersion {
+ V1,
+ V2,
+ UNKNOWN
+ };
+
+ FileVersion version;
+
+ public WALInputStream(File logFile) throws IOException {
+ channel = FileChannel.open(logFile.toPath());
+ fileSize = channel.size();
+ analyzeFileVersion();
+ getEndOffset();
+ this.logFile = logFile;
+ }
+
+ private void getEndOffset() throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES + Integer.BYTES) {
+ endOffset = channel.size();
+ return;
+ }
+ ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+ long position;
+ try {
+ if (version == FileVersion.V2) {
+ ByteBuffer magicStringBuffer =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
+ magicStringBuffer.flip();
+ if (!new
String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING)) {
+ // this is a broken wal file
+ endOffset = channel.size();
+ return;
+ }
+ position = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES;
+ } else {
+ ByteBuffer magicStringBuffer =
+ ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1.getBytes().length);
+ channel.read(
+ magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_V1.getBytes().length);
+ magicStringBuffer.flip();
+ if (!new
String(magicStringBuffer.array()).equals(WALWriter.MAGIC_STRING_V1)) {
+ // this is a broken wal file
+ endOffset = channel.size();
+ return;
+ }
+ position = channel.size() -
WALWriter.MAGIC_STRING_V1.getBytes().length - Integer.BYTES;
+ }
+ channel.read(metadataSizeBuf, position);
+ metadataSizeBuf.flip();
+ int metadataSize = metadataSizeBuf.getInt();
+ endOffset = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES - metadataSize - 1;
+ } finally {
+ channel.position(WALWriter.MAGIC_STRING_BYTES);
+ }
+ }
+
+ private void analyzeFileVersion() throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES) {
+ version = FileVersion.UNKNOWN;
+ return;
+ }
+ if (isCurrentVersion()) {
+ this.version = FileVersion.V2;
+ return;
+ }
+ this.version = FileVersion.V1;
+ }
+
+ private boolean isCurrentVersion() throws IOException {
+ channel.position(0);
+ ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(buffer);
+ return new String(buffer.array()).equals(WALWriter.MAGIC_STRING);
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (Objects.isNull(dataBuffer) || dataBuffer.position() >=
dataBuffer.limit()) {
+ loadNextSegment();
+ }
+ return dataBuffer.get() & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (Objects.isNull(dataBuffer) || dataBuffer.position() >=
dataBuffer.limit()) {
+ loadNextSegment();
+ }
+ if (dataBuffer.remaining() >= len) {
+ dataBuffer.get(b, off, len);
+ return len;
+ }
+ int toBeRead = len;
+ while (toBeRead > 0) {
+ int remaining = dataBuffer.remaining();
+ int bytesRead = Math.min(remaining, toBeRead);
+ dataBuffer.get(b, off, bytesRead);
+ off += bytesRead;
+ toBeRead -= bytesRead;
+ if (toBeRead > 0) {
+ loadNextSegment();
+ }
+ }
+ return len;
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ dataBuffer = null;
+ }
+
+ @Override
+ public int available() throws IOException {
+ long size = (endOffset - channel.position());
+ if (!Objects.isNull(dataBuffer)) {
+ size += dataBuffer.limit() - dataBuffer.position();
+ }
+ return (int) size;
+ }
+
+ private void loadNextSegment() throws IOException {
+ if (channel.position() >= endOffset) {
+ throw new IOException("End of file");
+ }
+ if (version == FileVersion.V2) {
+ loadNextSegmentV2();
+ } else if (version == FileVersion.V1) {
+ loadNextSegmentV1();
+ } else {
+ tryLoadSegment();
+ }
+ }
+
+ private void loadNextSegmentV1() throws IOException {
+ // just read raw data as input
+ if (channel.position() >= fileSize) {
+ throw new IOException("Unexpected end of file");
+ }
+ if (Objects.isNull(dataBuffer)) {
+ // read 128 KB
+ dataBuffer = ByteBuffer.allocate(128 * 1024);
+ }
+ dataBuffer.clear();
+ channel.read(dataBuffer);
+ dataBuffer.flip();
+ }
+
+ private void loadNextSegmentV2() throws IOException {
+ headerBuffer.clear();
+ if (channel.read(headerBuffer) != Integer.BYTES + 1) {
+ throw new IOException("Unexpected end of file");
+ }
+ // compressionType originalSize compressedSize
+ headerBuffer.flip();
+ CompressionType compressionType =
CompressionType.deserialize(headerBuffer.get());
+ int dataBufferSize = headerBuffer.getInt();
+ if (compressionType != CompressionType.UNCOMPRESSED) {
+ compressedHeader.clear();
+ if (channel.read(compressedHeader) != Integer.BYTES) {
+ throw new IOException("Unexpected end of file");
+ }
+ compressedHeader.flip();
+ int uncompressedSize = compressedHeader.getInt();
+ dataBuffer = ByteBuffer.allocateDirect(uncompressedSize);
+ ByteBuffer compressedData = ByteBuffer.allocateDirect(dataBufferSize);
+ if (channel.read(compressedData) != dataBufferSize) {
+ throw new IOException("Unexpected end of file");
+ }
+ compressedData.flip();
+ IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(compressionType);
+ dataBuffer.clear();
+ unCompressor.uncompress(compressedData, dataBuffer);
+ } else {
+ dataBuffer = ByteBuffer.allocate(dataBufferSize);
+ if (channel.read(dataBuffer) != dataBufferSize) {
+ throw new IOException("Unexpected end of file");
+ }
+ }
+ dataBuffer.flip();
+ }
+
+ private void tryLoadSegment() throws IOException {
+ long originPosition = channel.position();
+ try {
+ loadNextSegmentV2();
+ version = FileVersion.V2;
+ } catch (Throwable e) {
+ // failed to load in V2 way, try in V1 way
+ logger.warn("Failed to load WAL segment in V2 way, try in V1 way", e);
+ channel.position(originPosition);
+ }
+
+ if (version == FileVersion.UNKNOWN) {
+ loadNextSegmentV1();
+ version = FileVersion.V1;
+ }
+ }
+
+ public void skipToGivenPosition(long pos) throws IOException {
+ if (version == FileVersion.V2) {
+ channel.position(WALWriter.MAGIC_STRING_BYTES);
+ ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES + Integer.BYTES);
+ long posRemain = pos;
+ int currSegmentSize = 0;
+ while (posRemain > 0) {
+ buffer.clear();
+ channel.read(buffer);
+ buffer.flip();
+ buffer.get();
+ currSegmentSize = buffer.getInt();
+ if (posRemain >= currSegmentSize) {
+ posRemain -= currSegmentSize;
+ } else {
+ break;
+ }
+ }
Review Comment:
Where is the segment skipped? I only see the header being read.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java:
##########
@@ -79,7 +84,7 @@ public int serializedSize() {
+ (memTablesId.isEmpty() ? 0 : Integer.BYTES + memTablesId.size() *
Long.BYTES);
}
- public void serialize(ByteBuffer buffer) {
+ public void serialize(File file, ByteBuffer buffer) {
buffer.putLong(firstSearchIndex);
buffer.putInt(buffersSize.size());
for (int size : buffersSize) {
Review Comment:
Where is the new parameter used?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java:
##########
@@ -135,6 +136,26 @@ public FileChannel openReadFileChannel() throws
IOException {
}
}
+ public WALInputStream openReadFileStream() throws IOException {
+ if (isInSealedFile()) {
+ walFile = walNode.getWALFile(walFileVersionId);
+ return new WALInputStream(walFile);
+ } else {
+ try {
+ walFile = walNode.getWALFile(walFileVersionId);
+ return new WALInputStream(walFile);
+ } catch (IOException e) {
+ // unsealed file may be renamed after sealed, so we should try again
+ if (isInSealedFile()) {
+ walFile = walNode.getWALFile(walFileVersionId);
+ return new WALInputStream(walFile);
+ } else {
+ throw e;
+ }
+ }
+ }
Review Comment:
It seems the "else" branch is just enough, and the branch above is not
significantly different from it.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java:
##########
@@ -100,11 +101,11 @@ ByteBuffer read() throws IOException {
if (!canRead()) {
throw new IOException("Target file hasn't been specified.");
}
- try (FileChannel channel = openReadFileChannel()) {
+ try (WALInputStream is = openReadFileStream()) {
+ is.skipToGivenPosition(position);
ByteBuffer buffer = ByteBuffer.allocate(size);
- channel.position(position);
- channel.read(buffer);
- buffer.clear();
+ is.read(buffer);
+ buffer.flip();
return buffer;
}
Review Comment:
I wonder about the efficiency of opening a stream for each WALEntry. Maybe
the stream should be reused somehow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]