This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 7851df7  HBASE-25712 Backport of HBASE-25692 to branch-1
7851df7 is described below

commit 7851df7927f26f86f7aa9271496a0ac653c5b91e
Author: Josh Elser <jel...@cloudera.com>
AuthorDate: Mon Mar 29 20:28:25 2021 -0400

    HBASE-25712 Backport of HBASE-25692 to branch-1
    
    HBASE-25692 ensures that we do not leak any InputStream (Socket)
    which would otherwise remain as CLOSE_WAIT until the java process
    exits. These orphaned sockets would eventually saturate Linux network
    and file limits.
    
    Closes #3104
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
---
 .../org/apache/hadoop/hbase/wal/WALFactory.java    |  56 +++++----
 .../apache/hadoop/hbase/wal/FileSystemProxy.java   | 106 ++++++++++++++++
 .../apache/hadoop/hbase/wal/TestWALFactory.java    | 137 +++++++++++++++++++++
 3 files changed, 274 insertions(+), 25 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 8a82bd0..47d3c54 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -311,7 +311,7 @@ public class WALFactory {
             reader.init(fs, path, conf, stream);
             return reader;
           }
-        } catch (IOException e) {
+        } catch (Exception e) {
           if (stream != null) {
             try {
               stream.close();
@@ -328,33 +328,39 @@ public class WALFactory {
               LOG.debug("exception details", exception);
             }
           }
-          String msg = e.getMessage();
-          if (msg != null && (msg.contains("Cannot obtain block length")
-              || msg.contains("Could not obtain the last block")
-              || msg.matches("Blocklist for [^ ]* has changed.*"))) {
-            if (++nbAttempt == 1) {
-              LOG.warn("Lease should have recovered. This is not expected. 
Will retry", e);
-            }
-            if (reporter != null && !reporter.progress()) {
-              throw new InterruptedIOException("Operation is cancelled");
-            }
-            if (nbAttempt > 2 && openTimeout < 
EnvironmentEdgeManager.currentTime()) {
-              LOG.error("Can't open after " + nbAttempt + " attempts and "
-                  + (EnvironmentEdgeManager.currentTime() - startWaiting) + 
"ms " + " for " + path);
-            } else {
-              try {
-                Thread.sleep(nbAttempt < 3 ? 500 : 1000);
-                continue; // retry
-              } catch (InterruptedException ie) {
-                InterruptedIOException iioe = new InterruptedIOException();
-                iioe.initCause(ie);
-                throw iioe;
+          if (e instanceof IOException) {
+            String msg = e.getMessage();
+            if (msg != null && (msg.contains("Cannot obtain block length")
+                || msg.contains("Could not obtain the last block")
+                || msg.matches("Blocklist for [^ ]* has changed.*"))) {
+              if (++nbAttempt == 1) {
+                LOG.warn("Lease should have recovered. This is not expected. 
Will retry", e);
+              }
+              if (reporter != null && !reporter.progress()) {
+                throw new InterruptedIOException("Operation is cancelled");
               }
+              if (nbAttempt > 2 && openTimeout < 
EnvironmentEdgeManager.currentTime()) {
+                LOG.error("Can't open after " + nbAttempt + " attempts and "
+                    + (EnvironmentEdgeManager.currentTime() - startWaiting)
+                    + "ms " + " for " + path);
+              } else {
+                try {
+                  Thread.sleep(nbAttempt < 3 ? 500 : 1000);
+                  continue; // retry
+                } catch (InterruptedException ie) {
+                  InterruptedIOException iioe = new InterruptedIOException();
+                  iioe.initCause(ie);
+                  throw iioe;
+                }
+              }
+              throw new LeaseNotRecoveredException(e);
+            } else {
+              throw e;
             }
-            throw new LeaseNotRecoveredException(e);
-          } else {
-            throw e;
           }
+
+          // Rethrow the original exception if we are not retrying due to 
HDFS-isms.
+          throw e;
         }
       }
     } catch (IOException ie) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java
new file mode 100644
index 0000000..2346ef0
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Create a non-abstract "proxy" for FileSystem because FileSystem is an
+ * abstract class and not an interface. Only interfaces can be used with the
+ * Java Proxy class to override functionality via an InvocationHandler.
+ *
+ */
+public class FileSystemProxy extends FileSystem {
+  private final FileSystem real;
+
+  public FileSystemProxy(FileSystem real) {
+    this.real = real;
+  }
+
+  @Override
+  public FSDataInputStream open(Path p) throws IOException {
+    return real.open(p);
+  }
+
+  @Override
+  public URI getUri() {
+    return real.getUri();
+  }
+
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    return real.open(f, bufferSize);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission, boolean 
overwrite,
+      int bufferSize, short replication, long blockSize, Progressable 
progress) throws IOException {
+    return real.create(f, permission, overwrite, bufferSize, replication, 
blockSize, progress);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize, Progressable 
progress)
+      throws IOException {
+    return real.append(f, bufferSize, progress);
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    return real.rename(src, dst);
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    return real.delete(f, recursive);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws FileNotFoundException, 
IOException {
+    return real.listStatus(f);
+  }
+
+  @Override
+  public void setWorkingDirectory(Path new_dir) {
+    real.setWorkingDirectory(new_dir);
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return real.getWorkingDirectory();
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    return real.mkdirs(f, permission);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    return real.getFileStatus(f);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 31717b6..0b4e80b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -27,9 +27,14 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.net.BindException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,13 +53,16 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
 import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -714,6 +722,135 @@ public class TestWALFactory {
     }
   }
 
+  @Test
+  public void testReaderClosedOnBadCodec() throws IOException {
+    // Create our own Configuration and WALFactory to avoid breaking other 
test methods
+    Configuration confWithCodec = new Configuration(conf);
+    confWithCodec.setClass(
+        WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, 
Codec.class);
+    WALFactory customFactory = new WALFactory(confWithCodec, null, 
currentTest.getMethodName());
+
+    // Hack a Proxy over the FileSystem so that we can track the InputStreams 
opened by
+    // the FileSystem and know if close() was called on those InputStreams.
+    final List<InputStreamProxy> openedReaders = new ArrayList<>();
+    FileSystemProxy proxyFs = new FileSystemProxy(fs) {
+      @Override
+      public FSDataInputStream open(Path p) throws IOException {
+        InputStreamProxy is = new InputStreamProxy(super.open(p));
+        openedReaders.add(is);
+        return is;
+      }
+
+      @Override
+      public FSDataInputStream open(Path p, int blockSize) throws IOException {
+        InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
+        openedReaders.add(is);
+        return is;
+      }
+    };
+
+    final HTableDescriptor htd =
+        new HTableDescriptor(TableName.valueOf(currentTest.getMethodName()));
+    htd.addFamily(new HColumnDescriptor(Bytes.toBytes("column")));
+
+    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
+        HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR);
+    for (HColumnDescriptor colDesc : htd.getColumnFamilies()) {
+      scopes.put(colDesc.getName(), 0);
+    }
+    byte[] row = Bytes.toBytes("row");
+    WAL.Reader reader = null;
+    final MultiVersionConcurrencyControl mvcc = new 
MultiVersionConcurrencyControl(1);
+    try {
+      // Write one column in one edit.
+      WALEdit cols = new WALEdit();
+      cols.add(new KeyValue(row, Bytes.toBytes("column"),
+        Bytes.toBytes("0"), System.currentTimeMillis(), new byte[] { 0 }));
+      final WAL log = customFactory.getWAL(
+          hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
+      final long txid = log.append(htd, hri,
+        new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), 
System.currentTimeMillis(),
+            mvcc),
+        cols, true);
+      // Sync the edit to the WAL
+      log.sync(txid);
+      log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
+      log.completeCacheFlush(hri.getEncodedNameAsBytes());
+      log.shutdown();
+
+      // Inject our failure, object is constructed via reflection.
+      BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
+
+      // Now open a reader on the log which will throw an exception when
+      // we try to instantiate the custom Codec.
+      Path filename = DefaultWALProvider.getCurrentFileName(log);
+      try {
+        reader = customFactory.createReader(proxyFs, filename);
+        fail("Expected to see an exception when creating WAL reader");
+      } catch (Exception e) {
+        // Expected that we get an exception
+      }
+      // We should have exactly one reader
+      assertEquals(1, openedReaders.size());
+      // And that reader should be closed.
+      int numNotClosed = 0;
+      for (InputStreamProxy openedReader : openedReaders) {
+        if (!openedReader.isClosed.get()) {
+          numNotClosed++;
+        }
+      }
+      assertEquals("Should not find any open readers", 0, numNotClosed);
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+
+  /**
+   * A proxy around FSDataInputStream which can report if close() was called.
+   */
+  private static class InputStreamProxy extends FSDataInputStream {
+    private final InputStream real;
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+    public InputStreamProxy(InputStream real) {
+      super(real);
+      this.real = real;
+    }
+
+    @Override
+    public void close() throws IOException {
+      isClosed.set(true);
+      real.close();
+    }
+  }
+
+  /**
+   * A custom WALCellCodec in which we can inject failure.
+   */
+  public static class BrokenWALCellCodec extends WALCellCodec {
+    static final AtomicBoolean THROW_FAILURE_ON_INIT = new 
AtomicBoolean(false);
+
+    static void maybeInjectFailure() {
+      if (THROW_FAILURE_ON_INIT.get()) {
+        throw new RuntimeException("Injected instantiation exception");
+      }
+    }
+
+    public BrokenWALCellCodec() {
+      super();
+      maybeInjectFailure();
+    }
+
+    public BrokenWALCellCodec(Configuration conf, CompressionContext 
compression) {
+      super(conf, compression);
+      maybeInjectFailure();
+    }
+  }
+
   static class DumbWALActionsListener extends WALActionsListener.Base {
     int increments = 0;
 

Reply via email to