[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem
ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem URL: https://github.com/apache/accumulo/pull/369#discussion_r167680196 ## File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ## @@ -414,7 +413,7 @@ public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, } } catch (EOFException e) { - log.warn("Got EOFException trying to read WAL header information, assuming the rest of the file (" + path + ") has no data."); + log.warn("Got EOFException trying to read WAL header information, assuming the rest of the file has no data."); Review comment: I'm not a fan of both logging *and* throwing. I think it's generally better to pick one or the other: handle an exception (by logging in this case) or pass it up. Dropping the path makes it even more obvious that this might be a redundant message... since my first instinct was to suggest adding `, e` to the `log.warn` method call, but that's probably not needed if the `LogHeaderIncompleteException` is keeping the stack trace for logging later. It's not a big deal... but something to think about if there's an obvious cleanup here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem
ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem URL: https://github.com/apache/accumulo/pull/369#discussion_r167682325 ## File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java ## @@ -113,44 +113,46 @@ public void sort(String name, Path srcPath, String destPath) { // the following call does not throw an exception if the file/dir does not exist fs.deleteRecursively(new Path(destPath)); -DFSLoggerInputStreams inputStreams; -try { - inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf); -} catch (LogHeaderIncompleteException e) { - log.warn("Could not read header from write-ahead log " + srcPath + ". Not sorting."); - // Creating a 'finished' marker will cause recovery to proceed normally and the - // empty file will be correctly ignored downstream. - fs.mkdirs(new Path(destPath)); - writeBuffer(destPath, Collections.> emptyList(), part++); - fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close(); - return; -} +try (final FSDataInputStream fsinput = fs.open(srcPath)) { + DFSLoggerInputStreams inputStreams; + try { +inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf); Review comment: I'm not exactly sure what kind of additional resources the crypto stream might use, but it seems to me that since `inputStreams` is not `Closeable`, it may be possible for an the `fsinput` to be closed, but the crypto stream in `inputStreams` to still be holding resources. Would need to dig further to be sure, but it would be nice if `DFSLoggerInputStreams` was `Closeable` and this was created as a second resource, in the `try-with-resources` block on line 116. Same comment applies to other uses of `DFSLoggerInputStreams`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem
ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem URL: https://github.com/apache/accumulo/pull/369#discussion_r167678913 ## File path: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ## @@ -370,11 +368,112 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer log.debug("Replication WAL to peer tserver"); final Set tids; -final DataInputStream input; -Span span = Trace.start("Read WAL header"); -span.data("file", p.toString()); -try { - input = getWalStream(p); +try (final FSDataInputStream fsinput = fs.open(p); final DataInputStream input = getWalStream(p, fsinput)) { Review comment: That might be configurable. Would have to investigate the options in newer Eclipse versions. It doesn't matter to me... if it's configurable, I'll defer to others as to what is most readable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem
ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem URL: https://github.com/apache/accumulo/pull/369#discussion_r164253587 ## File path: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ## @@ -420,91 +420,99 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer Status lastStatus = status, currentStatus = status; final AtomicReference exceptionRef = new AtomicReference<>(); -while (true) { - // Set some trace info - span = Trace.start("Replicate WAL batch"); - span.data("Batch size (bytes)", Long.toString(sizeLimit)); - span.data("File", p.toString()); - span.data("Peer instance name", peerContext.getInstance().getInstanceName()); - span.data("Peer tserver", peerTserver.toString()); - span.data("Remote table ID", remoteTableId); - - ReplicationStats replResult; - try { -// Read and send a batch of mutations -replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, -remoteTableId, tcreds, tids), timeout); - } catch (Exception e) { -log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(), peerTserver, e); -throw e; - } finally { -span.stop(); - } - - // Catch the overflow - long newBegin = currentStatus.getBegin() + replResult.entriesConsumed; - if (newBegin < 0) { -newBegin = Long.MAX_VALUE; - } - - currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build(); +try { + while (true) { +// Set some trace info +span = Trace.start("Replicate WAL batch"); +span.data("Batch size (bytes)", Long.toString(sizeLimit)); +span.data("File", p.toString()); +span.data("Peer instance name", peerContext.getInstance().getInstanceName()); +span.data("Peer tserver", peerTserver.toString()); +span.data("Remote table ID", remoteTableId); + +ReplicationStats replResult; +try { + // Read and send a batch of mutations + replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, + remoteTableId, tcreds, tids), timeout); +} catch (Exception e) { + log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(), peerTserver, e); + throw e; +} finally { + span.stop(); +} - log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus)); +// Catch the overflow +long newBegin = currentStatus.getBegin() + replResult.entriesConsumed; +if (newBegin < 0) { + newBegin = Long.MAX_VALUE; +} - // If we got a different status - if (!currentStatus.equals(lastStatus)) { -span = Trace.start("Update replication table"); -try { - if (null != accumuloUgi) { -final Status copy = currentStatus; -accumuloUgi.doAs(new PrivilegedAction() { - @Override - public Void run() { -try { - helper.recordNewStatus(p, copy, target); -} catch (Exception e) { - exceptionRef.set(e); +currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build(); + +log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus)); + +// If we got a different status +if (!currentStatus.equals(lastStatus)) { + span = Trace.start("Update replication table"); + try { +if (null != accumuloUgi) { + final Status copy = currentStatus; + accumuloUgi.doAs(new PrivilegedAction() { +@Override +public Void run() { + try { +helper.recordNewStatus(p, copy, target); + } catch (Exception e) { +exceptionRef.set(e); + } + return null; +} + }); + Exception e = exceptionRef.get(); + if (null != e) { +if (e instanceof TableNotFoundException) { + throw (TableNotFoundException) e; +} else if (e instanceof AccumuloSecurityException) { + throw (AccumuloSecurityException) e; +} else if (e instanceof AccumuloException) { + throw (AccumuloException) e; +} else { + throw new RuntimeException("Received unexpected
[GitHub] ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem
ctubbsii commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem URL: https://github.com/apache/accumulo/pull/369#discussion_r164253697 ## File path: server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ## @@ -420,91 +420,99 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer Status lastStatus = status, currentStatus = status; final AtomicReference exceptionRef = new AtomicReference<>(); -while (true) { - // Set some trace info - span = Trace.start("Replicate WAL batch"); - span.data("Batch size (bytes)", Long.toString(sizeLimit)); - span.data("File", p.toString()); - span.data("Peer instance name", peerContext.getInstance().getInstanceName()); - span.data("Peer tserver", peerTserver.toString()); - span.data("Remote table ID", remoteTableId); - - ReplicationStats replResult; - try { -// Read and send a batch of mutations -replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, -remoteTableId, tcreds, tids), timeout); - } catch (Exception e) { -log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(), peerTserver, e); -throw e; - } finally { -span.stop(); - } - - // Catch the overflow - long newBegin = currentStatus.getBegin() + replResult.entriesConsumed; - if (newBegin < 0) { -newBegin = Long.MAX_VALUE; - } - - currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build(); +try { + while (true) { +// Set some trace info +span = Trace.start("Replicate WAL batch"); +span.data("Batch size (bytes)", Long.toString(sizeLimit)); +span.data("File", p.toString()); +span.data("Peer instance name", peerContext.getInstance().getInstanceName()); +span.data("Peer tserver", peerTserver.toString()); +span.data("Remote table ID", remoteTableId); + +ReplicationStats replResult; +try { + // Read and send a batch of mutations + replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, + remoteTableId, tcreds, tids), timeout); +} catch (Exception e) { + log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(), peerTserver, e); + throw e; +} finally { + span.stop(); +} - log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus)); +// Catch the overflow +long newBegin = currentStatus.getBegin() + replResult.entriesConsumed; +if (newBegin < 0) { + newBegin = Long.MAX_VALUE; +} - // If we got a different status - if (!currentStatus.equals(lastStatus)) { -span = Trace.start("Update replication table"); -try { - if (null != accumuloUgi) { -final Status copy = currentStatus; -accumuloUgi.doAs(new PrivilegedAction() { - @Override - public Void run() { -try { - helper.recordNewStatus(p, copy, target); -} catch (Exception e) { - exceptionRef.set(e); +currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build(); + +log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus)); + +// If we got a different status +if (!currentStatus.equals(lastStatus)) { + span = Trace.start("Update replication table"); + try { +if (null != accumuloUgi) { + final Status copy = currentStatus; + accumuloUgi.doAs(new PrivilegedAction() { +@Override +public Void run() { + try { +helper.recordNewStatus(p, copy, target); + } catch (Exception e) { +exceptionRef.set(e); + } + return null; +} + }); + Exception e = exceptionRef.get(); + if (null != e) { +if (e instanceof TableNotFoundException) { + throw (TableNotFoundException) e; +} else if (e instanceof AccumuloSecurityException) { + throw (AccumuloSecurityException) e; +} else if (e instanceof AccumuloException) { + throw (AccumuloException) e; +} else { + throw new RuntimeException("Received unexpected