This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 2d57595  HBASE-23195 FSDataInputStreamWrapper unbuffer can NOT invoke 
the classes that NOT implements CanUnbuffer but its parents class implements 
CanUnbuffer
2d57595 is described below

commit 2d57595f5481e56548afe8641a33dbce6663c79e
Author: zhaoym6 <zhao...@asiainfo.com>
AuthorDate: Tue Oct 22 21:57:24 2019 +0800

    HBASE-23195 FSDataInputStreamWrapper unbuffer can NOT invoke the classes 
that NOT implements CanUnbuffer but its parents class implements CanUnbuffer
    
    Closes #746
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Signed-off-by: Josh Elser <els...@apache.org>
---
 .../hadoop/hbase/io/FSDataInputStreamWrapper.java  |  41 ++----
 .../hbase/io/TestFSDataInputStreamWrapper.java     | 139 +++++++++++++++++++++
 2 files changed, 150 insertions(+), 30 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index 67bca84..c1f9a7d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -20,11 +20,10 @@ package org.apache.hadoop.hbase.io;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -91,11 +90,6 @@ public class FSDataInputStreamWrapper implements Closeable {
   // reads without hbase checksum verification.
   private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1);
 
-  private Boolean instanceOfCanUnbuffer = null;
-  // Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method 
to avoid compilation
-  // errors against Hadoop pre 2.6.4 and 2.7.1 versions.
-  private Method unbuffer = null;
-
   private final static ReadStatistics readStatistics = new ReadStatistics();
 
   private static class ReadStatistics {
@@ -105,6 +99,9 @@ public class FSDataInputStreamWrapper implements Closeable {
     long totalZeroCopyBytesRead;
   }
 
+  private Boolean instanceOfCanUnbuffer = null;
+  private CanUnbuffer unbuffer = null;
+
   public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException 
{
     this(fs, path, false, -1L);
   }
@@ -331,39 +328,23 @@ public class FSDataInputStreamWrapper implements 
Closeable {
       if (this.instanceOfCanUnbuffer == null) {
         // To ensure we compute whether the stream is instance of CanUnbuffer 
only once.
         this.instanceOfCanUnbuffer = false;
-        Class<?>[] streamInterfaces = streamClass.getInterfaces();
-        for (Class c : streamInterfaces) {
-          if 
(c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) {
-            try {
-              this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
-            } catch (NoSuchMethodException | SecurityException e) {
-              if (isLogTraceEnabled) {
-                LOG.trace("Failed to find 'unbuffer' method in class " + 
streamClass
-                    + " . So there may be a TCP socket connection "
-                    + "left open in CLOSE_WAIT state.", e);
-              }
-              return;
-            }
-            this.instanceOfCanUnbuffer = true;
-            break;
-          }
+        if (wrappedStream instanceof CanUnbuffer) {
+          this.unbuffer = (CanUnbuffer) wrappedStream;
+          this.instanceOfCanUnbuffer = true;
         }
       }
       if (this.instanceOfCanUnbuffer) {
         try {
-          this.unbuffer.invoke(wrappedStream);
-        } catch (IllegalAccessException | IllegalArgumentException | 
InvocationTargetException e) {
+          this.unbuffer.unbuffer();
+        } catch (UnsupportedOperationException e){
           if (isLogTraceEnabled) {
             LOG.trace("Failed to invoke 'unbuffer' method in class " + 
streamClass
-                + " . So there may be a TCP socket connection left open in 
CLOSE_WAIT state.", e);
+                + " . So there may be the stream does not support 
unbuffering.", e);
           }
         }
       } else {
         if (isLogTraceEnabled) {
-          LOG.trace("Failed to find 'unbuffer' method in class " + streamClass
-              + " . So there may be a TCP socket connection "
-              + "left open in CLOSE_WAIT state. For more details check "
-              + "https://issues.apache.org/jira/browse/HBASE-9393";);
+          LOG.trace("Failed to find 'unbuffer' method in class " + 
streamClass);
         }
       }
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java
new file mode 100644
index 0000000..22b6c62
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestFSDataInputStreamWrapper {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestFSDataInputStreamWrapper.class);
+
+  @Test
+  public void testUnbuffer() throws Exception {
+    InputStream pc = new ParentClass();
+    FSDataInputStreamWrapper fsdisw1 =
+      new FSDataInputStreamWrapper(new FSDataInputStream(pc));
+    fsdisw1.unbuffer();
+    // parent class should be true
+    assertTrue(((ParentClass)pc).getIsCallUnbuffer());
+    fsdisw1.close();
+
+    InputStream cc1 = new ChildClass1();
+    FSDataInputStreamWrapper fsdisw2 =
+      new FSDataInputStreamWrapper(new FSDataInputStream(cc1));
+    fsdisw2.unbuffer();
+    // child1 class should be true
+    assertTrue(((ChildClass1)cc1).getIsCallUnbuffer());
+    fsdisw2.close();
+  }
+
+  private class ParentClass extends FSInputStream
+      implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
+                 HasEnhancedByteBufferAccess, CanUnbuffer {
+
+    public boolean isCallUnbuffer = false;
+
+    public boolean getIsCallUnbuffer(){
+      return isCallUnbuffer;
+    }
+
+    @Override
+    public void unbuffer() {
+      isCallUnbuffer =  true;
+    }
+
+    @Override
+    public int read() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public ByteBuffer read(ByteBufferPool paramByteBufferPool,
+        int paramInt, EnumSet<ReadOption> paramEnumSet)
+            throws IOException, UnsupportedOperationException {
+      return null;
+    }
+
+    @Override
+    public void releaseBuffer(ByteBuffer paramByteBuffer) {
+
+    }
+
+    @Override
+    public void setReadahead(Long paramLong)
+        throws IOException, UnsupportedOperationException {
+
+    }
+
+    @Override
+    public void setDropBehind(Boolean paramBoolean)
+        throws IOException, UnsupportedOperationException {
+
+    }
+
+    @Override
+    public int read(ByteBuffer paramByteBuffer) throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void seek(long paramLong) throws IOException {
+
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public boolean seekToNewSource(long paramLong) throws IOException {
+      return false;
+    }
+  }
+
+  private class ChildClass1 extends ParentClass{
+    @Override
+    public void unbuffer() {
+      isCallUnbuffer = true;
+    }
+  }
+}

Reply via email to