This is an automated email from the ASF dual-hosted git repository.

adamjshook pushed a commit to branch 1.7
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.7 by this push:
     new 229eb7b  [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem 
(#369)
229eb7b is described below

commit 229eb7be7f040ecdf1f0238201533529da59bca7
Author: Adam J. Shook <adamjsh...@gmail.com>
AuthorDate: Wed Feb 14 14:15:05 2018 -0500

    [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem (#369)
---
 .../org/apache/accumulo/tserver/log/DfsLogger.java |   5 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |  68 ++---
 .../apache/accumulo/tserver/logger/LogReader.java  |  43 +--
 .../tserver/replication/AccumuloReplicaSystem.java | 297 ++++++++++-----------
 .../accumulo/tserver/log/LocalWALRecoveryTest.java |   2 +-
 5 files changed, 208 insertions(+), 207 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 8fd2b7a..7b8221f 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -298,8 +298,7 @@ public class DfsLogger {
     metaReference = meta;
   }
 
-  public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager 
fs, Path path, AccumuloConfiguration conf) throws IOException {
-    FSDataInputStream input = fs.open(path);
+  public static DFSLoggerInputStreams 
readHeaderAndReturnStream(FSDataInputStream input, AccumuloConfiguration conf) 
throws IOException {
     DataInputStream decryptingInput = null;
 
     byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes(UTF_8);
@@ -375,7 +374,7 @@ public class DfsLogger {
 
       }
     } catch (EOFException e) {
-      log.warn("Got EOFException trying to read WAL header information, 
assuming the rest of the file (" + path + ") has no data.");
+      log.warn("Got EOFException trying to read WAL header information, 
assuming the rest of the file has no data.");
       // A TabletServer might have died before the (complete) header was 
written
       throw new LogHeaderIncompleteException(e);
     }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 11097ce..ba5e488 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -113,44 +113,46 @@ public class LogSorter {
         // the following call does not throw an exception if the file/dir does 
not exist
         fs.deleteRecursively(new Path(destPath));
 
-        DFSLoggerInputStreams inputStreams;
-        try {
-          inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, 
conf);
-        } catch (LogHeaderIncompleteException e) {
-          log.warn("Could not read header from write-ahead log " + srcPath + 
". Not sorting.");
-          // Creating a 'finished' marker will cause recovery to proceed 
normally and the
-          // empty file will be correctly ignored downstream.
-          fs.mkdirs(new Path(destPath));
-          writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> 
emptyList(), part++);
-          fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
-          return;
-        }
+        try (final FSDataInputStream fsinput = fs.open(srcPath)) {
+          DFSLoggerInputStreams inputStreams;
+          try {
+            inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf);
+          } catch (LogHeaderIncompleteException e) {
+            log.warn("Could not read header from write-ahead log " + srcPath + 
". Not sorting.");
+            // Creating a 'finished' marker will cause recovery to proceed 
normally and the
+            // empty file will be correctly ignored downstream.
+            fs.mkdirs(new Path(destPath));
+            writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> 
emptyList(), part++);
+            fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+            return;
+          }
 
-        this.input = inputStreams.getOriginalInput();
-        this.decryptingInput = inputStreams.getDecryptingInputStream();
+          this.input = inputStreams.getOriginalInput();
+          this.decryptingInput = inputStreams.getDecryptingInputStream();
 
-        final long bufferSize = 
conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
-        Thread.currentThread().setName("Sorting " + name + " for recovery");
-        while (true) {
-          final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new 
ArrayList<>();
-          try {
-            long start = input.getPos();
-            while (input.getPos() - start < bufferSize) {
-              LogFileKey key = new LogFileKey();
-              LogFileValue value = new LogFileValue();
-              key.readFields(decryptingInput);
-              value.readFields(decryptingInput);
-              buffer.add(new Pair<>(key, value));
+          final long bufferSize = 
conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
+          Thread.currentThread().setName("Sorting " + name + " for recovery");
+          while (true) {
+            final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new 
ArrayList<>();
+            try {
+              long start = input.getPos();
+              while (input.getPos() - start < bufferSize) {
+                LogFileKey key = new LogFileKey();
+                LogFileValue value = new LogFileValue();
+                key.readFields(decryptingInput);
+                value.readFields(decryptingInput);
+                buffer.add(new Pair<>(key, value));
+              }
+              writeBuffer(destPath, buffer, part++);
+              buffer.clear();
+            } catch (EOFException ex) {
+              writeBuffer(destPath, buffer, part++);
+              break;
             }
-            writeBuffer(destPath, buffer, part++);
-            buffer.clear();
-          } catch (EOFException ex) {
-            writeBuffer(destPath, buffer, part++);
-            break;
           }
+          fs.create(new Path(destPath, "finished")).close();
+          log.info("Finished log sort " + name + " " + getBytesCopied() + " 
bytes " + part + " parts in " + getSortTime() + "ms");
         }
-        fs.create(new Path(destPath, "finished")).close();
-        log.info("Finished log sort " + name + " " + getBytesCopied() + " 
bytes " + part + " parts in " + getSortTime() + "ms");
       } catch (Throwable t) {
         try {
           // parent dir may not exist
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 23a9fab..928861e 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.log.MultiReader;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -100,28 +101,30 @@ public class LogReader {
       LogFileValue value = new LogFileValue();
 
       if (fs.isFile(path)) {
-        // read log entries from a simple hdfs file
-        DFSLoggerInputStreams streams;
-        try {
-          streams = DfsLogger.readHeaderAndReturnStream(fs, path, 
SiteConfiguration.getInstance());
-        } catch (LogHeaderIncompleteException e) {
-          log.warn("Could not read header for " + path + ". Ignoring...");
-          continue;
-        }
-        DataInputStream input = streams.getDecryptingInputStream();
-
-        try {
-          while (true) {
-            try {
-              key.readFields(input);
-              value.readFields(input);
-            } catch (EOFException ex) {
-              break;
+        try (final FSDataInputStream fsinput = fs.open(path)) {
+          // read log entries from a simple hdfs file
+          DFSLoggerInputStreams streams;
+          try {
+            streams = DfsLogger.readHeaderAndReturnStream(fsinput, 
SiteConfiguration.getInstance());
+          } catch (LogHeaderIncompleteException e) {
+            log.warn("Could not read header for " + path + ". Ignoring...");
+            continue;
+          }
+          DataInputStream input = streams.getDecryptingInputStream();
+
+          try {
+            while (true) {
+              try {
+                key.readFields(input);
+                value.readFields(input);
+              } catch (EOFException ex) {
+                break;
+              }
+              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, 
opts.maxMutations);
             }
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, 
opts.maxMutations);
+          } finally {
+            input.close();
           }
-        } finally {
-          input.close();
         }
       } else {
         // read the log entries sorted in a map file
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 28eca21..b910707 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -76,6 +76,7 @@ import 
org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.transport.TTransportException;
@@ -320,46 +321,43 @@ public class AccumuloReplicaSystem implements 
ReplicaSystem {
   protected Status replicateRFiles(ClientContext peerContext, final 
HostAndPort peerTserver, final ReplicationTarget target, final Path p, final 
Status status,
       final long sizeLimit, final String remoteTableId, final TCredentials 
tcreds, final ReplicaSystemHelper helper, long timeout) throws 
TTransportException,
       AccumuloException, AccumuloSecurityException {
-    DataInputStream input;
-    try {
-      input = getRFileInputStream(p);
-    } catch (IOException e) {
-      log.error("Could not create input stream from RFile, will retry", e);
-      return status;
-    }
-
-    Status lastStatus = status, currentStatus = status;
-    while (true) {
-      // Read and send a batch of mutations
-      ReplicationStats replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
RFileClientExecReturn(target, input, p,
-          currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
+    try (final DataInputStream input = getRFileInputStream(p)) {
+      Status lastStatus = status, currentStatus = status;
+      while (true) {
+        // Read and send a batch of mutations
+        ReplicationStats replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
RFileClientExecReturn(target, input, p,
+            currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
 
-      // Catch the overflow
-      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-      if (newBegin < 0) {
-        newBegin = Long.MAX_VALUE;
-      }
+        // Catch the overflow
+        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+        if (newBegin < 0) {
+          newBegin = Long.MAX_VALUE;
+        }
 
-      currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+        currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
 
-      log.debug("Sent batch for replication of {} to {}, with new Status {}", 
p, target, ProtobufUtil.toString(currentStatus));
+        log.debug("Sent batch for replication of {} to {}, with new Status 
{}", p, target, ProtobufUtil.toString(currentStatus));
 
-      // If we got a different status
-      if (!currentStatus.equals(lastStatus)) {
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(currentStatus)) {
-          return currentStatus;
+        // If we got a different status
+        if (!currentStatus.equals(lastStatus)) {
+          // If we don't have any more work, just quit
+          if (!StatusUtil.isWorkRequired(currentStatus)) {
+            return currentStatus;
+          } else {
+            // Otherwise, let it loop and replicate some more data
+            lastStatus = currentStatus;
+          }
         } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = currentStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
+          log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
 
-        // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
-        // we can just not record any updates, and it will be picked up again 
by the work assigner
-        return status;
+          // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
+          // we can just not record any updates, and it will be picked up 
again by the work assigner
+          return status;
+        }
       }
+    } catch (IOException e) {
+      log.error("Could not create input stream from RFile, will retry", e);
+      return status;
     }
   }
 
@@ -369,11 +367,112 @@ public class AccumuloReplicaSystem implements 
ReplicaSystem {
 
     log.debug("Replication WAL to peer tserver");
     final Set<Integer> tids;
-    final DataInputStream input;
-    Span span = Trace.start("Read WAL header");
-    span.data("file", p.toString());
-    try {
-      input = getWalStream(p);
+    try (final FSDataInputStream fsinput = fs.open(p); final DataInputStream 
input = getWalStream(p, fsinput)) {
+      log.debug("Skipping unwanted data in WAL");
+      Span span = Trace.start("Consume WAL prefix");
+      span.data("file", p.toString());
+      try {
+        // We want to read all records in the WAL up to the "begin" offset 
contained in the Status message,
+        // building a Set of tids from DEFINE_TABLET events which correspond 
to table ids for future mutations
+        tids = consumeWalPrefix(target, input, p, status, sizeLimit);
+      } catch (IOException e) {
+        log.warn("Unexpected error consuming file.");
+        return status;
+      } finally {
+        span.stop();
+      }
+
+      log.debug("Sending batches of data to peer tserver");
+
+      Status lastStatus = status, currentStatus = status;
+      final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+      while (true) {
+        // Set some trace info
+        span = Trace.start("Replicate WAL batch");
+        span.data("Batch size (bytes)", Long.toString(sizeLimit));
+        span.data("File", p.toString());
+        span.data("Peer instance name", 
peerContext.getInstance().getInstanceName());
+        span.data("Peer tserver", peerTserver.toString());
+        span.data("Remote table ID", remoteTableId);
+
+        ReplicationStats replResult;
+        try {
+          // Read and send a batch of mutations
+          replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
WalClientExecReturn(target, input, p, currentStatus,
+              sizeLimit, remoteTableId, tcreds, tids), timeout);
+        } catch (Exception e) {
+          log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
+          throw e;
+        } finally {
+          span.stop();
+        }
+
+        // Catch the overflow
+        long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+        if (newBegin < 0) {
+          newBegin = Long.MAX_VALUE;
+        }
+
+        currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+        log.debug("Sent batch for replication of {} to {}, with new Status 
{}", p, target, ProtobufUtil.toString(currentStatus));
+
+        // If we got a different status
+        if (!currentStatus.equals(lastStatus)) {
+          span = Trace.start("Update replication table");
+          try {
+            if (null != accumuloUgi) {
+              final Status copy = currentStatus;
+              accumuloUgi.doAs(new PrivilegedAction<Void>() {
+                @Override
+                public Void run() {
+                  try {
+                    helper.recordNewStatus(p, copy, target);
+                  } catch (Exception e) {
+                    exceptionRef.set(e);
+                  }
+                  return null;
+                }
+              });
+              Exception e = exceptionRef.get();
+              if (null != e) {
+                if (e instanceof TableNotFoundException) {
+                  throw (TableNotFoundException) e;
+                } else if (e instanceof AccumuloSecurityException) {
+                  throw (AccumuloSecurityException) e;
+                } else if (e instanceof AccumuloException) {
+                  throw (AccumuloException) e;
+                } else {
+                  throw new RuntimeException("Received unexpected exception", 
e);
+                }
+              }
+            } else {
+              helper.recordNewStatus(p, currentStatus, target);
+            }
+          } catch (TableNotFoundException e) {
+            log.error("Tried to update status in replication table for {} as 
{}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
+            throw new RuntimeException("Replication table did not exist, will 
retry", e);
+          } finally {
+            span.stop();
+          }
+
+          log.debug("Recorded updated status for {}: {}", p, 
ProtobufUtil.toString(currentStatus));
+
+          // If we don't have any more work, just quit
+          if (!StatusUtil.isWorkRequired(currentStatus)) {
+            return currentStatus;
+          } else {
+            // Otherwise, let it loop and replicate some more data
+            lastStatus = currentStatus;
+          }
+        } else {
+          log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
+
+          // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
+          // we can just not record any updates, and it will be picked up 
again by the work assigner
+          return status;
+        }
+      }
     } catch (LogHeaderIncompleteException e) {
       log.warn("Could not read header from {}, assuming that there is no data 
present in the WAL, therefore replication is complete", p);
       Status newStatus;
@@ -383,7 +482,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem 
{
       } else {
         newStatus = 
Status.newBuilder(status).setBegin(status.getEnd()).build();
       }
-      span = Trace.start("Update replication table");
+      Span span = Trace.start("Update replication table");
       try {
         helper.recordNewStatus(p, newStatus, target);
       } catch (TableNotFoundException tnfe) {
@@ -397,114 +496,6 @@ public class AccumuloReplicaSystem implements 
ReplicaSystem {
       log.error("Could not create stream for WAL", e);
       // No data sent (bytes nor records) and no progress made
       return status;
-    } finally {
-      span.stop();
-    }
-
-    log.debug("Skipping unwanted data in WAL");
-    span = Trace.start("Consume WAL prefix");
-    span.data("file", p.toString());
-    try {
-      // We want to read all records in the WAL up to the "begin" offset 
contained in the Status message,
-      // building a Set of tids from DEFINE_TABLET events which correspond to 
table ids for future mutations
-      tids = consumeWalPrefix(target, input, p, status, sizeLimit);
-    } catch (IOException e) {
-      log.warn("Unexpected error consuming file.");
-      return status;
-    } finally {
-      span.stop();
-    }
-
-    log.debug("Sending batches of data to peer tserver");
-
-    Status lastStatus = status, currentStatus = status;
-    final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
-    while (true) {
-      // Set some trace info
-      span = Trace.start("Replicate WAL batch");
-      span.data("Batch size (bytes)", Long.toString(sizeLimit));
-      span.data("File", p.toString());
-      span.data("Peer instance name", 
peerContext.getInstance().getInstanceName());
-      span.data("Peer tserver", peerTserver.toString());
-      span.data("Remote table ID", remoteTableId);
-
-      ReplicationStats replResult;
-      try {
-        // Read and send a batch of mutations
-        replResult = ReplicationClient.executeServicerWithReturn(peerContext, 
peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
-            remoteTableId, tcreds, tids), timeout);
-      } catch (Exception e) {
-        log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
-        throw e;
-      } finally {
-        span.stop();
-      }
-
-      // Catch the overflow
-      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-      if (newBegin < 0) {
-        newBegin = Long.MAX_VALUE;
-      }
-
-      currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
-
-      log.debug("Sent batch for replication of {} to {}, with new Status {}", 
p, target, ProtobufUtil.toString(currentStatus));
-
-      // If we got a different status
-      if (!currentStatus.equals(lastStatus)) {
-        span = Trace.start("Update replication table");
-        try {
-          if (null != accumuloUgi) {
-            final Status copy = currentStatus;
-            accumuloUgi.doAs(new PrivilegedAction<Void>() {
-              @Override
-              public Void run() {
-                try {
-                  helper.recordNewStatus(p, copy, target);
-                } catch (Exception e) {
-                  exceptionRef.set(e);
-                }
-                return null;
-              }
-            });
-            Exception e = exceptionRef.get();
-            if (null != e) {
-              if (e instanceof TableNotFoundException) {
-                throw (TableNotFoundException) e;
-              } else if (e instanceof AccumuloSecurityException) {
-                throw (AccumuloSecurityException) e;
-              } else if (e instanceof AccumuloException) {
-                throw (AccumuloException) e;
-              } else {
-                throw new RuntimeException("Received unexpected exception", e);
-              }
-            }
-          } else {
-            helper.recordNewStatus(p, currentStatus, target);
-          }
-        } catch (TableNotFoundException e) {
-          log.error("Tried to update status in replication table for {} as {}, 
but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
-          throw new RuntimeException("Replication table did not exist, will 
retry", e);
-        } finally {
-          span.stop();
-        }
-
-        log.debug("Recorded updated status for {}: {}", p, 
ProtobufUtil.toString(currentStatus));
-
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(currentStatus)) {
-          return currentStatus;
-        } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = currentStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
-
-        // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
-        // we can just not record any updates, and it will be picked up again 
by the work assigner
-        return status;
-      }
     }
   }
 
@@ -686,9 +677,15 @@ public class AccumuloReplicaSystem implements 
ReplicaSystem {
     return tids;
   }
 
-  public DataInputStream getWalStream(Path p) throws IOException {
-    DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, 
conf);
-    return streams.getDecryptingInputStream();
+  public DataInputStream getWalStream(Path p, FSDataInputStream input) throws 
IOException {
+    Span span = Trace.start("Read WAL header");
+    span.data("file", p.toString());
+    try {
+      DFSLoggerInputStreams streams = 
DfsLogger.readHeaderAndReturnStream(input, conf);
+      return streams.getDecryptingInputStream();
+    } finally {
+      span.stop();
+    }
   }
 
   protected WalReplication getWalEdits(ReplicationTarget target, 
DataInputStream wal, Path p, Status status, long sizeLimit, Set<Integer> 
desiredTids)
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
index 8261a17..f1f6a3a 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
@@ -78,7 +78,7 @@ public class LocalWALRecoveryTest {
     final Path path = recovered[0].getPath();
     final VolumeManager volumeManager = 
VolumeManagerImpl.getLocal(folder.getRoot().getAbsolutePath());
 
-    final DFSLoggerInputStreams streams = 
DfsLogger.readHeaderAndReturnStream(volumeManager, path, configuration);
+    final DFSLoggerInputStreams streams = 
DfsLogger.readHeaderAndReturnStream(volumeManager.open(path), configuration);
     final DataInputStream input = streams.getDecryptingInputStream();
 
     final LogFileKey key = new LogFileKey();

-- 
To stop receiving notification emails like this one, please contact
adamjsh...@apache.org.

Reply via email to