[hbase] branch HBASE-28028 updated: HBASE-28028 Read all compressed bytes to a byte array before submitting them to decompressor

2023-08-17 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-28028
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-28028 by this push:
 new 5e5c210bf85 HBASE-28028 Read all compressed bytes to a byte array 
before submitting them to decompressor
5e5c210bf85 is described below

commit 5e5c210bf8553b9a9e09adc7aedf6a10fb5953cc
Author: Duo Zhang 
AuthorDate: Thu Aug 17 22:54:19 2023 +0800

HBASE-28028 Read all compressed bytes to a byte array before submitting 
them to decompressor
---
 .../hbase/regionserver/wal/CompressionContext.java |  8 +--
 ...LDecompressionBoundedDelegatingInputStream.java | 78 ++
 2 files changed, 40 insertions(+), 46 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index 73cf4821db0..8f6d1792954 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
 import org.apache.hadoop.hbase.io.TagCompressionContext;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.util.Dictionary;
@@ -77,7 +76,7 @@ public class CompressionContext {
 private final Compression.Algorithm algorithm;
 private Compressor compressor;
 private Decompressor decompressor;
-private BoundedDelegatingInputStream lowerIn;
+private WALDecompressionBoundedDelegatingInputStream lowerIn;
 private ByteArrayOutputStream lowerOut;
 private InputStream compressedIn;
 private OutputStream compressedOut;
@@ -115,13 +114,11 @@ public class CompressionContext {
 
   // Create the input streams here the first time around.
   if (compressedIn == null) {
-lowerIn = new BoundedDelegatingInputStream(in, inLength);
+lowerIn = new WALDecompressionBoundedDelegatingInputStream(in);
 if (decompressor == null) {
   decompressor = algorithm.getDecompressor();
 }
 compressedIn = algorithm.createDecompressionStream(lowerIn, 
decompressor, IO_BUFFER_SIZE);
-  } else {
-lowerIn.setDelegate(in, inLength);
   }
   if (outLength == 0) {
 // The BufferedInputStream will return earlier and skip reading 
anything if outLength == 0,
@@ -131,6 +128,7 @@ public class CompressionContext {
 // such as data loss when splitting wal or replicating wal.
 IOUtils.skipFully(in, inLength);
   } else {
+lowerIn.resetLimit(inLength);
 IOUtils.readFully(compressedIn, outArray, outOffset, outLength);
   }
 }
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
similarity index 57%
rename from 
hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
rename to 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
index 2a6db09050c..688a5053c6e 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDecompressionBoundedDelegatingInputStream.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,75 +15,76 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.io;
+package org.apache.hadoop.hbase.regionserver.wal;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import org.apache.commons.io.IOUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This is a stream that will only supply bytes from its delegate up to a 
certain limit. When there
- * is an attempt to set the position beyond that it will signal that the input 
is finished.
+ * This class is only used by WAL ValueCompressor for decompression.
+ * 
+ * WARNING: The implementation is very tricky and does not 
follow typical
+ * InputStream pattern, so do not use it in any other places.
  */
 @InterfaceAudience.Private
-public class BoundedDelegatingInputStream extends 

[hbase] branch HBASE-28028 updated: HBASE-28028 Read all compressed bytes to a byte array before submitting them to decompressor

2023-08-17 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-28028
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-28028 by this push:
 new 1417faeb952 HBASE-28028 Read all compressed bytes to a byte array 
before submitting them to decompressor
1417faeb952 is described below

commit 1417faeb952c97421ac5b150f2c16b9a8122cec1
Author: Duo Zhang 
AuthorDate: Thu Aug 17 17:05:05 2023 +0800

HBASE-28028 Read all compressed bytes to a byte array before submitting 
them to decompressor
---
 .../apache/hadoop/hbase/regionserver/wal/CompressionContext.java| 6 ++
 1 file changed, 6 insertions(+)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index 73cf4821db0..6a2cd91 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -131,6 +132,11 @@ public class CompressionContext {
 // such as data loss when splitting wal or replicating wal.
 IOUtils.skipFully(in, inLength);
   } else {
+// FIXME: this is just used to confirm whether providing partial 
compressed data to decompressor is a problem
+// if confirmed, should try find other more efficient way to do this.
+byte[] compressed = new byte[inLength];
+IOUtils.readFully(in, compressed);
+lowerIn.setDelegate(new ByteArrayInputStream(compressed));
 IOUtils.readFully(compressedIn, outArray, outOffset, outLength);
   }
 }