adamjshook closed pull request #369: [ACCUMULO-4787] Close input stream in 
AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 395e18fba2..f2d36fd44c 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -337,8 +337,7 @@ public DfsLogger(ServerResources conf, String filename, 
String meta) throws IOEx
     metaReference = meta;
   }
 
-  public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager 
fs, Path path, AccumuloConfiguration conf) throws IOException {
-    FSDataInputStream input = fs.open(path);
+  public static DFSLoggerInputStreams 
readHeaderAndReturnStream(FSDataInputStream input, AccumuloConfiguration conf) 
throws IOException {
     DataInputStream decryptingInput = null;
 
     byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes(UTF_8);
@@ -414,7 +413,7 @@ public static DFSLoggerInputStreams 
readHeaderAndReturnStream(VolumeManager fs,
 
       }
     } catch (EOFException e) {
-      log.warn("Got EOFException trying to read WAL header information, 
assuming the rest of the file (" + path + ") has no data.");
+      log.warn("Got EOFException trying to read WAL header information, 
assuming the rest of the file has no data.");
       // A TabletServer might have died before the (complete) header was 
written
       throw new LogHeaderIncompleteException(e);
     }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 11097ceaea..ba5e488226 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -113,44 +113,46 @@ public void sort(String name, Path srcPath, String 
destPath) {
         // the following call does not throw an exception if the file/dir does 
not exist
         fs.deleteRecursively(new Path(destPath));
 
-        DFSLoggerInputStreams inputStreams;
-        try {
-          inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, 
conf);
-        } catch (LogHeaderIncompleteException e) {
-          log.warn("Could not read header from write-ahead log " + srcPath + 
". Not sorting.");
-          // Creating a 'finished' marker will cause recovery to proceed 
normally and the
-          // empty file will be correctly ignored downstream.
-          fs.mkdirs(new Path(destPath));
-          writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> 
emptyList(), part++);
-          fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
-          return;
-        }
+        try (final FSDataInputStream fsinput = fs.open(srcPath)) {
+          DFSLoggerInputStreams inputStreams;
+          try {
+            inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf);
+          } catch (LogHeaderIncompleteException e) {
+            log.warn("Could not read header from write-ahead log " + srcPath + 
". Not sorting.");
+            // Creating a 'finished' marker will cause recovery to proceed 
normally and the
+            // empty file will be correctly ignored downstream.
+            fs.mkdirs(new Path(destPath));
+            writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> 
emptyList(), part++);
+            fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+            return;
+          }
 
-        this.input = inputStreams.getOriginalInput();
-        this.decryptingInput = inputStreams.getDecryptingInputStream();
+          this.input = inputStreams.getOriginalInput();
+          this.decryptingInput = inputStreams.getDecryptingInputStream();
 
-        final long bufferSize = 
conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
-        Thread.currentThread().setName("Sorting " + name + " for recovery");
-        while (true) {
-          final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new 
ArrayList<>();
-          try {
-            long start = input.getPos();
-            while (input.getPos() - start < bufferSize) {
-              LogFileKey key = new LogFileKey();
-              LogFileValue value = new LogFileValue();
-              key.readFields(decryptingInput);
-              value.readFields(decryptingInput);
-              buffer.add(new Pair<>(key, value));
+          final long bufferSize = 
conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
+          Thread.currentThread().setName("Sorting " + name + " for recovery");
+          while (true) {
+            final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new 
ArrayList<>();
+            try {
+              long start = input.getPos();
+              while (input.getPos() - start < bufferSize) {
+                LogFileKey key = new LogFileKey();
+                LogFileValue value = new LogFileValue();
+                key.readFields(decryptingInput);
+                value.readFields(decryptingInput);
+                buffer.add(new Pair<>(key, value));
+              }
+              writeBuffer(destPath, buffer, part++);
+              buffer.clear();
+            } catch (EOFException ex) {
+              writeBuffer(destPath, buffer, part++);
+              break;
             }
-            writeBuffer(destPath, buffer, part++);
-            buffer.clear();
-          } catch (EOFException ex) {
-            writeBuffer(destPath, buffer, part++);
-            break;
           }
+          fs.create(new Path(destPath, "finished")).close();
+          log.info("Finished log sort " + name + " " + getBytesCopied() + " 
bytes " + part + " parts in " + getSortTime() + "ms");
         }
-        fs.create(new Path(destPath, "finished")).close();
-        log.info("Finished log sort " + name + " " + getBytesCopied() + " 
bytes " + part + " parts in " + getSortTime() + "ms");
       } catch (Throwable t) {
         try {
           // parent dir may not exist
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index d5a4db9bda..fb286c4ccf 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -38,6 +38,7 @@
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.log.MultiReader;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -100,28 +101,30 @@ public static void main(String[] args) throws IOException 
{
       LogFileValue value = new LogFileValue();
 
       if (fs.isFile(path)) {
-        // read log entries from a simple hdfs file
-        DFSLoggerInputStreams streams;
-        try {
-          streams = DfsLogger.readHeaderAndReturnStream(fs, path, 
SiteConfiguration.getInstance());
-        } catch (LogHeaderIncompleteException e) {
-          log.warn("Could not read header for " + path + ". Ignoring...");
-          continue;
-        }
-        DataInputStream input = streams.getDecryptingInputStream();
-
-        try {
-          while (true) {
-            try {
-              key.readFields(input);
-              value.readFields(input);
-            } catch (EOFException ex) {
-              break;
+        try (final FSDataInputStream fsinput = fs.open(path)) {
+          // read log entries from a simple hdfs file
+          DFSLoggerInputStreams streams;
+          try {
+            streams = DfsLogger.readHeaderAndReturnStream(fsinput, 
SiteConfiguration.getInstance());
+          } catch (LogHeaderIncompleteException e) {
+            log.warn("Could not read header for " + path + ". Ignoring...");
+            continue;
+          }
+          DataInputStream input = streams.getDecryptingInputStream();
+
+          try {
+            while (true) {
+              try {
+                key.readFields(input);
+                value.readFields(input);
+              } catch (EOFException ex) {
+                break;
+              }
+              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, 
opts.maxMutations);
             }
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, 
opts.maxMutations);
+          } finally {
+            input.close();
           }
-        } finally {
-          input.close();
         }
       } else {
         // read the log entries sorted in a map file
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 24b7756d22..97c8b7eab6 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -77,6 +77,7 @@
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.transport.TTransportException;
@@ -321,46 +322,43 @@ public String execute(ReplicationCoordinator.Client 
client) throws Exception {
   protected Status replicateRFiles(ClientContext peerContext, final 
HostAndPort peerTserver, final ReplicationTarget target, final Path p, final 
Status status,
       final long sizeLimit, final String remoteTableId, final TCredentials 
tcreds, final ReplicaSystemHelper helper, long timeout) throws 
TTransportException,
       AccumuloException, AccumuloSecurityException {
-    DataInputStream input;
-    try {
-      input = getRFileInputStream(p);
-    } catch (IOException e) {
-      log.error("Could not create input stream from RFile, will retry", e);
-      return status;
-    }
-
-    Status lastStatus = status, currentStatus = status;
-    while (true) {
-      // Read and send a batch of mutations
-      ReplicationStats replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
RFileClientExecReturn(target, input, p,
-          currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
+    try (final DataInputStream input = getRFileInputStream(p)) {
+      Status lastStatus = status, currentStatus = status;
+      while (true) {
+        // Read and send a batch of mutations
+        ReplicationStats replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
RFileClientExecReturn(target, input, p,
+            currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
 
-      // Catch the overflow
-      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-      if (newBegin < 0) {
-        newBegin = Long.MAX_VALUE;
-      }
+        // Catch the overflow
+        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+        if (newBegin < 0) {
+          newBegin = Long.MAX_VALUE;
+        }
 
-      currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+        currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
 
-      log.debug("Sent batch for replication of {} to {}, with new Status {}", 
p, target, ProtobufUtil.toString(currentStatus));
+        log.debug("Sent batch for replication of {} to {}, with new Status 
{}", p, target, ProtobufUtil.toString(currentStatus));
 
-      // If we got a different status
-      if (!currentStatus.equals(lastStatus)) {
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(currentStatus)) {
-          return currentStatus;
+        // If we got a different status
+        if (!currentStatus.equals(lastStatus)) {
+          // If we don't have any more work, just quit
+          if (!StatusUtil.isWorkRequired(currentStatus)) {
+            return currentStatus;
+          } else {
+            // Otherwise, let it loop and replicate some more data
+            lastStatus = currentStatus;
+          }
         } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = currentStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
+          log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
 
-        // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
-        // we can just not record any updates, and it will be picked up again 
by the work assigner
-        return status;
+          // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
+          // we can just not record any updates, and it will be picked up 
again by the work assigner
+          return status;
+        }
       }
+    } catch (IOException e) {
+      log.error("Could not create input stream from RFile, will retry", e);
+      return status;
     }
   }
 
@@ -370,11 +368,112 @@ protected Status replicateLogs(ClientContext 
peerContext, final HostAndPort peer
 
     log.debug("Replication WAL to peer tserver");
     final Set<Integer> tids;
-    final DataInputStream input;
-    Span span = Trace.start("Read WAL header");
-    span.data("file", p.toString());
-    try {
-      input = getWalStream(p);
+    try (final FSDataInputStream fsinput = fs.open(p); final DataInputStream 
input = getWalStream(p, fsinput)) {
+      log.debug("Skipping unwanted data in WAL");
+      Span span = Trace.start("Consume WAL prefix");
+      span.data("file", p.toString());
+      try {
+        // We want to read all records in the WAL up to the "begin" offset 
contained in the Status message,
+        // building a Set of tids from DEFINE_TABLET events which correspond 
to table ids for future mutations
+        tids = consumeWalPrefix(target, input, p, status, sizeLimit);
+      } catch (IOException e) {
+        log.warn("Unexpected error consuming file.");
+        return status;
+      } finally {
+        span.stop();
+      }
+
+      log.debug("Sending batches of data to peer tserver");
+
+      Status lastStatus = status, currentStatus = status;
+      final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+      while (true) {
+        // Set some trace info
+        span = Trace.start("Replicate WAL batch");
+        span.data("Batch size (bytes)", Long.toString(sizeLimit));
+        span.data("File", p.toString());
+        span.data("Peer instance name", 
peerContext.getInstance().getInstanceName());
+        span.data("Peer tserver", peerTserver.toString());
+        span.data("Remote table ID", remoteTableId);
+
+        ReplicationStats replResult;
+        try {
+          // Read and send a batch of mutations
+          replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
WalClientExecReturn(target, input, p, currentStatus,
+              sizeLimit, remoteTableId, tcreds, tids), timeout);
+        } catch (Exception e) {
+          log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
+          throw e;
+        } finally {
+          span.stop();
+        }
+
+        // Catch the overflow
+        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+        if (newBegin < 0) {
+          newBegin = Long.MAX_VALUE;
+        }
+
+        currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+        log.debug("Sent batch for replication of {} to {}, with new Status 
{}", p, target, ProtobufUtil.toString(currentStatus));
+
+        // If we got a different status
+        if (!currentStatus.equals(lastStatus)) {
+          span = Trace.start("Update replication table");
+          try {
+            if (null != accumuloUgi) {
+              final Status copy = currentStatus;
+              accumuloUgi.doAs(new PrivilegedAction<Void>() {
+                @Override
+                public Void run() {
+                  try {
+                    helper.recordNewStatus(p, copy, target);
+                  } catch (Exception e) {
+                    exceptionRef.set(e);
+                  }
+                  return null;
+                }
+              });
+              Exception e = exceptionRef.get();
+              if (null != e) {
+                if (e instanceof TableNotFoundException) {
+                  throw (TableNotFoundException) e;
+                } else if (e instanceof AccumuloSecurityException) {
+                  throw (AccumuloSecurityException) e;
+                } else if (e instanceof AccumuloException) {
+                  throw (AccumuloException) e;
+                } else {
+                  throw new RuntimeException("Received unexpected exception", 
e);
+                }
+              }
+            } else {
+              helper.recordNewStatus(p, currentStatus, target);
+            }
+          } catch (TableNotFoundException e) {
+            log.error("Tried to update status in replication table for {} as 
{}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
+            throw new RuntimeException("Replication table did not exist, will 
retry", e);
+          } finally {
+            span.stop();
+          }
+
+          log.debug("Recorded updated status for {}: {}", p, 
ProtobufUtil.toString(currentStatus));
+
+          // If we don't have any more work, just quit
+          if (!StatusUtil.isWorkRequired(currentStatus)) {
+            return currentStatus;
+          } else {
+            // Otherwise, let it loop and replicate some more data
+            lastStatus = currentStatus;
+          }
+        } else {
+          log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
+
+          // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
+          // we can just not record any updates, and it will be picked up 
again by the work assigner
+          return status;
+        }
+      }
     } catch (LogHeaderIncompleteException e) {
       log.warn("Could not read header from {}, assuming that there is no data 
present in the WAL, therefore replication is complete", p);
       Status newStatus;
@@ -384,7 +483,7 @@ protected Status replicateLogs(ClientContext peerContext, 
final HostAndPort peer
       } else {
         newStatus = 
Status.newBuilder(status).setBegin(status.getEnd()).build();
       }
-      span = Trace.start("Update replication table");
+      Span span = Trace.start("Update replication table");
       try {
         helper.recordNewStatus(p, newStatus, target);
       } catch (TableNotFoundException tnfe) {
@@ -398,114 +497,6 @@ protected Status replicateLogs(ClientContext peerContext, 
final HostAndPort peer
       log.error("Could not create stream for WAL", e);
       // No data sent (bytes nor records) and no progress made
       return status;
-    } finally {
-      span.stop();
-    }
-
-    log.debug("Skipping unwanted data in WAL");
-    span = Trace.start("Consume WAL prefix");
-    span.data("file", p.toString());
-    try {
-      // We want to read all records in the WAL up to the "begin" offset 
contained in the Status message,
-      // building a Set of tids from DEFINE_TABLET events which correspond to 
table ids for future mutations
-      tids = consumeWalPrefix(target, input, p, status, sizeLimit);
-    } catch (IOException e) {
-      log.warn("Unexpected error consuming file.");
-      return status;
-    } finally {
-      span.stop();
-    }
-
-    log.debug("Sending batches of data to peer tserver");
-
-    Status lastStatus = status, currentStatus = status;
-    final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
-    while (true) {
-      // Set some trace info
-      span = Trace.start("Replicate WAL batch");
-      span.data("Batch size (bytes)", Long.toString(sizeLimit));
-      span.data("File", p.toString());
-      span.data("Peer instance name", 
peerContext.getInstance().getInstanceName());
-      span.data("Peer tserver", peerTserver.toString());
-      span.data("Remote table ID", remoteTableId);
-
-      ReplicationStats replResult;
-      try {
-        // Read and send a batch of mutations
-        replResult = ReplicationClient.executeServicerWithReturn(peerContext, 
peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
-            remoteTableId, tcreds, tids), timeout);
-      } catch (Exception e) {
-        log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
-        throw e;
-      } finally {
-        span.stop();
-      }
-
-      // Catch the overflow
-      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-      if (newBegin < 0) {
-        newBegin = Long.MAX_VALUE;
-      }
-
-      currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
-
-      log.debug("Sent batch for replication of {} to {}, with new Status {}", 
p, target, ProtobufUtil.toString(currentStatus));
-
-      // If we got a different status
-      if (!currentStatus.equals(lastStatus)) {
-        span = Trace.start("Update replication table");
-        try {
-          if (null != accumuloUgi) {
-            final Status copy = currentStatus;
-            accumuloUgi.doAs(new PrivilegedAction<Void>() {
-              @Override
-              public Void run() {
-                try {
-                  helper.recordNewStatus(p, copy, target);
-                } catch (Exception e) {
-                  exceptionRef.set(e);
-                }
-                return null;
-              }
-            });
-            Exception e = exceptionRef.get();
-            if (null != e) {
-              if (e instanceof TableNotFoundException) {
-                throw (TableNotFoundException) e;
-              } else if (e instanceof AccumuloSecurityException) {
-                throw (AccumuloSecurityException) e;
-              } else if (e instanceof AccumuloException) {
-                throw (AccumuloException) e;
-              } else {
-                throw new RuntimeException("Received unexpected exception", e);
-              }
-            }
-          } else {
-            helper.recordNewStatus(p, currentStatus, target);
-          }
-        } catch (TableNotFoundException e) {
-          log.error("Tried to update status in replication table for {} as {}, 
but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
-          throw new RuntimeException("Replication table did not exist, will 
retry", e);
-        } finally {
-          span.stop();
-        }
-
-        log.debug("Recorded updated status for {}: {}", p, 
ProtobufUtil.toString(currentStatus));
-
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(currentStatus)) {
-          return currentStatus;
-        } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = currentStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
-
-        // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
-        // we can just not record any updates, and it will be picked up again 
by the work assigner
-        return status;
-      }
     }
   }
 
@@ -687,9 +678,15 @@ protected RFileReplication getKeyValues(ReplicationTarget 
target, DataInputStrea
     return tids;
   }
 
-  public DataInputStream getWalStream(Path p) throws IOException {
-    DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, 
conf);
-    return streams.getDecryptingInputStream();
+  public DataInputStream getWalStream(Path p, FSDataInputStream input) throws 
IOException {
+    Span span = Trace.start("Read WAL header");
+    span.data("file", p.toString());
+    try {
+      DFSLoggerInputStreams streams = 
DfsLogger.readHeaderAndReturnStream(input, conf);
+      return streams.getDecryptingInputStream();
+    } finally {
+      span.stop();
+    }
   }
 
   protected WalReplication getWalEdits(ReplicationTarget target, 
DataInputStream wal, Path p, Status status, long sizeLimit, Set<Integer> 
desiredTids)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to