This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-28028
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28028 by this push:
new 5e5c210bf85 HBASE-28028 Read all compressed bytes to a byte array
before submitting them to decompressor
5e5c210bf85 is described below
commit 5e5c210bf8553b9a9e09adc7aedf6a10fb5953cc
Author: Duo Zhang
AuthorDate: Thu Aug 17 22:54:19 2023 +0800
HBASE-28028 Read all compressed bytes to a byte array before submitting
them to decompressor
---
.../hbase/regionserver/wal/CompressionContext.java | 8 +--
...LDecompressionBoundedDelegatingInputStream.java | 78 ++
2 files changed, 40 insertions(+), 46 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index 73cf4821db0..8f6d1792954 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -28,7 +28,6 @@ import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.Dictionary;
@@ -77,7 +76,7 @@ public class CompressionContext {
private final Compression.Algorithm algorithm;
private Compressor compressor;
private Decompressor decompressor;
-private BoundedDelegatingInputStream lowerIn;
+private WALDecompressionBoundedDelegatingInputStream lowerIn;
private ByteArrayOutputStream lowerOut;
private InputStream compressedIn;
private OutputStream compressedOut;
@@ -115,13 +114,11 @@ public class CompressionContext {
// Create the input streams here the first time around.
if (compressedIn == null) {
-lowerIn = new BoundedDelegatingInputStream(in, inLength);
+lowerIn = new WALDecompressionBoundedDelegatingInputStream(in);
if (decompressor == null) {
decompressor = algorithm.getDecompressor();
}
compressedIn = algorithm.createDecompressionStream(lowerIn,
decompressor, IO_BUFFER_SIZE);
- } else {
-lowerIn.setDelegate(in, inLength);
}
if (outLength == 0) {
// The BufferedInputStream will return earlier and skip reading
anything if outLength == 0,
@@ -131,6 +128,7 @@ public class CompressionContext {
// such as data loss when splitting wal or replicating wal.
IOUtils.skipFully(in, inLength);
} else {
+lowerIn.resetLimit(inLength);
IOUtils.readFully(compressedIn, outArray, outOffset, outLength);
}
}
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
similarity index 57%
rename from
hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
rename to
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
index 2a6db09050c..688a5053c6e 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,75 +15,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.io;
+package org.apache.hadoop.hbase.regionserver.wal;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import org.apache.commons.io.IOUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This is a stream that will only supply bytes from its delegate up to a
certain limit. When there
- * is an attempt to set the position beyond that it will signal that the input
is finished.
+ * This class is only used by WAL ValueCompressor for decompression.
+ *
+ * WARNING: The implementation is very tricky and does not
follow typical
+ * InputStream pattern, so do not use it in any other places.
*/
@InterfaceAudience.Private
-public class BoundedDelegatingInputStream extends