[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem

2018-02-12 Thread GitBox
ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close 
input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r167680196
 
 

 ##
 File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
 ##
 @@ -414,7 +413,7 @@ public static DFSLoggerInputStreams 
readHeaderAndReturnStream(VolumeManager fs,
 
   }
 } catch (EOFException e) {
-  log.warn("Got EOFException trying to read WAL header information, 
assuming the rest of the file (" + path + ") has no data.");
+  log.warn("Got EOFException trying to read WAL header information, 
assuming the rest of the file has no data.");
 
 Review comment:
   I'm not a fan of both logging *and* throwing. I think it's generally better 
to pick one or the other: handle an exception (by logging in this case) or pass 
it up. Dropping the path makes it even more obvious that this might be a 
redundant message... since my first instinct was to suggest adding `, e` to the 
`log.warn` method call, but that's probably not needed if the 
`LogHeaderIncompleteException` is keeping the stack trace for logging later.
   
   It's not a big deal... but something to think about if there's an obvious 
cleanup here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem

2018-02-12 Thread GitBox
ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close 
input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r167682325
 
 

 ##
 File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
 ##
 @@ -113,44 +113,46 @@ public void sort(String name, Path srcPath, String 
destPath) {
 // the following call does not throw an exception if the file/dir does 
not exist
 fs.deleteRecursively(new Path(destPath));
 
-DFSLoggerInputStreams inputStreams;
-try {
-  inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, 
conf);
-} catch (LogHeaderIncompleteException e) {
-  log.warn("Could not read header from write-ahead log " + srcPath + 
". Not sorting.");
-  // Creating a 'finished' marker will cause recovery to proceed 
normally and the
-  // empty file will be correctly ignored downstream.
-  fs.mkdirs(new Path(destPath));
-  writeBuffer(destPath, Collections.> 
emptyList(), part++);
-  fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
-  return;
-}
+try (final FSDataInputStream fsinput = fs.open(srcPath)) {
+  DFSLoggerInputStreams inputStreams;
+  try {
+inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf);
 
 Review comment:
   I'm not exactly sure what kind of additional resources the crypto stream 
might use, but it seems to me that since `inputStreams` is not `Closeable`, it 
may be possible for an the `fsinput` to be closed, but the crypto stream in 
`inputStreams` to still be holding resources. Would need to dig further to be 
sure, but it would be nice if `DFSLoggerInputStreams` was `Closeable` and this 
was created as a second resource, in the `try-with-resources` block on line 116.
   
   Same comment applies to other uses of `DFSLoggerInputStreams`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem

2018-02-12 Thread GitBox
ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close 
input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r167678913
 
 

 ##
 File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 ##
 @@ -370,11 +368,112 @@ protected Status replicateLogs(ClientContext 
peerContext, final HostAndPort peer
 
 log.debug("Replication WAL to peer tserver");
 final Set tids;
-final DataInputStream input;
-Span span = Trace.start("Read WAL header");
-span.data("file", p.toString());
-try {
-  input = getWalStream(p);
+try (final FSDataInputStream fsinput = fs.open(p); final DataInputStream 
input = getWalStream(p, fsinput)) {
 
 Review comment:
   That might be configurable. Would have to investigate the options in newer 
Eclipse versions. It doesn't matter to me... if it's configurable, I'll defer 
to others as to what is most readable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem

2018-01-26 Thread GitBox
ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close 
input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r164253587
 
 

 ##
 File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 ##
 @@ -420,91 +420,99 @@ protected Status replicateLogs(ClientContext 
peerContext, final HostAndPort peer
 
 Status lastStatus = status, currentStatus = status;
 final AtomicReference exceptionRef = new AtomicReference<>();
-while (true) {
-  // Set some trace info
-  span = Trace.start("Replicate WAL batch");
-  span.data("Batch size (bytes)", Long.toString(sizeLimit));
-  span.data("File", p.toString());
-  span.data("Peer instance name", 
peerContext.getInstance().getInstanceName());
-  span.data("Peer tserver", peerTserver.toString());
-  span.data("Remote table ID", remoteTableId);
-
-  ReplicationStats replResult;
-  try {
-// Read and send a batch of mutations
-replResult = ReplicationClient.executeServicerWithReturn(peerContext, 
peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
-remoteTableId, tcreds, tids), timeout);
-  } catch (Exception e) {
-log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
-throw e;
-  } finally {
-span.stop();
-  }
-
-  // Catch the overflow
-  long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-  if (newBegin < 0) {
-newBegin = Long.MAX_VALUE;
-  }
-
-  currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+try {
+  while (true) {
+// Set some trace info
+span = Trace.start("Replicate WAL batch");
+span.data("Batch size (bytes)", Long.toString(sizeLimit));
+span.data("File", p.toString());
+span.data("Peer instance name", 
peerContext.getInstance().getInstanceName());
+span.data("Peer tserver", peerTserver.toString());
+span.data("Remote table ID", remoteTableId);
+
+ReplicationStats replResult;
+try {
+  // Read and send a batch of mutations
+  replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
+  remoteTableId, tcreds, tids), timeout);
+} catch (Exception e) {
+  log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
+  throw e;
+} finally {
+  span.stop();
+}
 
-  log.debug("Sent batch for replication of {} to {}, with new Status {}", 
p, target, ProtobufUtil.toString(currentStatus));
+// Catch the overflow
+long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+if (newBegin < 0) {
+  newBegin = Long.MAX_VALUE;
+}
 
-  // If we got a different status
-  if (!currentStatus.equals(lastStatus)) {
-span = Trace.start("Update replication table");
-try {
-  if (null != accumuloUgi) {
-final Status copy = currentStatus;
-accumuloUgi.doAs(new PrivilegedAction() {
-  @Override
-  public Void run() {
-try {
-  helper.recordNewStatus(p, copy, target);
-} catch (Exception e) {
-  exceptionRef.set(e);
+currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+log.debug("Sent batch for replication of {} to {}, with new Status 
{}", p, target, ProtobufUtil.toString(currentStatus));
+
+// If we got a different status
+if (!currentStatus.equals(lastStatus)) {
+  span = Trace.start("Update replication table");
+  try {
+if (null != accumuloUgi) {
+  final Status copy = currentStatus;
+  accumuloUgi.doAs(new PrivilegedAction() {
+@Override
+public Void run() {
+  try {
+helper.recordNewStatus(p, copy, target);
+  } catch (Exception e) {
+exceptionRef.set(e);
+  }
+  return null;
+}
+  });
+  Exception e = exceptionRef.get();
+  if (null != e) {
+if (e instanceof TableNotFoundException) {
+  throw (TableNotFoundException) e;
+} else if (e instanceof AccumuloSecurityException) {
+  throw (AccumuloSecurityException) e;
+} else if (e instanceof AccumuloException) {
+  throw (AccumuloException) e;
+} else {
+  throw new RuntimeException("Received unexpected 

[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem

2018-01-26 Thread GitBox
ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close 
input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r164253697
 
 

 ##
 File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 ##
 @@ -420,91 +420,99 @@ protected Status replicateLogs(ClientContext 
peerContext, final HostAndPort peer
 
 Status lastStatus = status, currentStatus = status;
 final AtomicReference exceptionRef = new AtomicReference<>();
-while (true) {
-  // Set some trace info
-  span = Trace.start("Replicate WAL batch");
-  span.data("Batch size (bytes)", Long.toString(sizeLimit));
-  span.data("File", p.toString());
-  span.data("Peer instance name", 
peerContext.getInstance().getInstanceName());
-  span.data("Peer tserver", peerTserver.toString());
-  span.data("Remote table ID", remoteTableId);
-
-  ReplicationStats replResult;
-  try {
-// Read and send a batch of mutations
-replResult = ReplicationClient.executeServicerWithReturn(peerContext, 
peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
-remoteTableId, tcreds, tids), timeout);
-  } catch (Exception e) {
-log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
-throw e;
-  } finally {
-span.stop();
-  }
-
-  // Catch the overflow
-  long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-  if (newBegin < 0) {
-newBegin = Long.MAX_VALUE;
-  }
-
-  currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+try {
+  while (true) {
+// Set some trace info
+span = Trace.start("Replicate WAL batch");
+span.data("Batch size (bytes)", Long.toString(sizeLimit));
+span.data("File", p.toString());
+span.data("Peer instance name", 
peerContext.getInstance().getInstanceName());
+span.data("Peer tserver", peerTserver.toString());
+span.data("Remote table ID", remoteTableId);
+
+ReplicationStats replResult;
+try {
+  // Read and send a batch of mutations
+  replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
+  remoteTableId, tcreds, tids), timeout);
+} catch (Exception e) {
+  log.error("Caught exception replicating data to {} at {}", 
peerContext.getInstance().getInstanceName(), peerTserver, e);
+  throw e;
+} finally {
+  span.stop();
+}
 
-  log.debug("Sent batch for replication of {} to {}, with new Status {}", 
p, target, ProtobufUtil.toString(currentStatus));
+// Catch the overflow
+long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+if (newBegin < 0) {
+  newBegin = Long.MAX_VALUE;
+}
 
-  // If we got a different status
-  if (!currentStatus.equals(lastStatus)) {
-span = Trace.start("Update replication table");
-try {
-  if (null != accumuloUgi) {
-final Status copy = currentStatus;
-accumuloUgi.doAs(new PrivilegedAction() {
-  @Override
-  public Void run() {
-try {
-  helper.recordNewStatus(p, copy, target);
-} catch (Exception e) {
-  exceptionRef.set(e);
+currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+log.debug("Sent batch for replication of {} to {}, with new Status 
{}", p, target, ProtobufUtil.toString(currentStatus));
+
+// If we got a different status
+if (!currentStatus.equals(lastStatus)) {
+  span = Trace.start("Update replication table");
+  try {
+if (null != accumuloUgi) {
+  final Status copy = currentStatus;
+  accumuloUgi.doAs(new PrivilegedAction() {
+@Override
+public Void run() {
+  try {
+helper.recordNewStatus(p, copy, target);
+  } catch (Exception e) {
+exceptionRef.set(e);
+  }
+  return null;
+}
+  });
+  Exception e = exceptionRef.get();
+  if (null != e) {
+if (e instanceof TableNotFoundException) {
+  throw (TableNotFoundException) e;
+} else if (e instanceof AccumuloSecurityException) {
+  throw (AccumuloSecurityException) e;
+} else if (e instanceof AccumuloException) {
+  throw (AccumuloException) e;
+} else {
+  throw new RuntimeException("Received unexpected