[GitHub] keith-turner commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem

2018-02-12 Thread GitBox
keith-turner commented on a change in pull request #369: [ACCUMULO-4787] Close 
input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r167660400
 
 

 ##
 File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 ##
 @@ -370,11 +368,112 @@ protected Status replicateLogs(ClientContext 
peerContext, final HostAndPort peer
 
 log.debug("Replication WAL to peer tserver");
 final Set tids;
-final DataInputStream input;
-Span span = Trace.start("Read WAL header");
-span.data("file", p.toString());
-try {
-  input = getWalStream(p);
+try (final FSDataInputStream fsinput = fs.open(p); final DataInputStream 
input = getWalStream(p, fsinput)) {
+  log.debug("Skipping unwanted data in WAL");
+  Span span = Trace.start("Consume WAL prefix");
+  span.data("file", p.toString());
+  try {
+// We want to read all records in the WAL up to the "begin" offset 
contained in the Status message,
+// building a Set of tids from DEFINE_TABLET events which correspond 
to table ids for future mutations
+tids = consumeWalPrefix(target, input, p, status, sizeLimit);
+  } catch (IOException e) {
+log.warn("Unexpected error consuming file.");
+return status;
+  } finally {
+span.stop();
+  }
+
+  log.debug("Sending batches of data to peer tserver");
 
 Review comment:
   Would be nice to open an issue for follow up work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] keith-turner commented on a change in pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem

2018-02-09 Thread GitBox
keith-turner commented on a change in pull request #369: [ACCUMULO-4787] Close 
input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r167342746
 
 

 ##
 File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
 ##
 @@ -321,60 +321,67 @@ public String execute(ReplicationCoordinator.Client 
client) throws Exception {
   protected Status replicateRFiles(ClientContext peerContext, final 
HostAndPort peerTserver, final ReplicationTarget target, final Path p, final 
Status status,
   final long sizeLimit, final String remoteTableId, final TCredentials 
tcreds, final ReplicaSystemHelper helper, long timeout) throws 
TTransportException,
   AccumuloException, AccumuloSecurityException {
-DataInputStream input;
-try {
-  input = getRFileInputStream(p);
-} catch (IOException e) {
-  log.error("Could not create input stream from RFile, will retry", e);
-  return status;
-}
-
-Status lastStatus = status, currentStatus = status;
-while (true) {
-  // Read and send a batch of mutations
-  ReplicationStats replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
RFileClientExecReturn(target, input, p,
-  currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
+try (final DataInputStream input = getRFileInputStream(p)) {
+  Status lastStatus = status, currentStatus = status;
+  while (true) {
+// Read and send a batch of mutations
+ReplicationStats replResult = 
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new 
RFileClientExecReturn(target, input, p,
+currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
 
-  // Catch the overflow
-  long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
-  if (newBegin < 0) {
-newBegin = Long.MAX_VALUE;
-  }
+// Catch the overflow
+long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+if (newBegin < 0) {
+  newBegin = Long.MAX_VALUE;
+}
 
-  currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+currentStatus = 
Status.newBuilder(currentStatus).setBegin(newBegin).build();
 
-  log.debug("Sent batch for replication of {} to {}, with new Status {}", 
p, target, ProtobufUtil.toString(currentStatus));
+log.debug("Sent batch for replication of {} to {}, with new Status 
{}", p, target, ProtobufUtil.toString(currentStatus));
 
-  // If we got a different status
-  if (!currentStatus.equals(lastStatus)) {
-// If we don't have any more work, just quit
-if (!StatusUtil.isWorkRequired(currentStatus)) {
-  return currentStatus;
+// If we got a different status
+if (!currentStatus.equals(lastStatus)) {
+  // If we don't have any more work, just quit
+  if (!StatusUtil.isWorkRequired(currentStatus)) {
+return currentStatus;
+  } else {
+// Otherwise, let it loop and replicate some more data
+lastStatus = currentStatus;
+  }
 } else {
-  // Otherwise, let it loop and replicate some more data
-  lastStatus = currentStatus;
-}
-  } else {
-log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
+  log.debug("Did not replicate any new data for {} to {}, (state was 
{})", p, target, ProtobufUtil.toString(lastStatus));
 
-// otherwise, we didn't actually replicate (likely because there was 
error sending the data)
-// we can just not record any updates, and it will be picked up again 
by the work assigner
-return status;
+  // otherwise, we didn't actually replicate (likely because there was 
error sending the data)
+  // we can just not record any updates, and it will be picked up 
again by the work assigner
+  return status;
+}
   }
+} catch (IOException e) {
+  log.error("Could not create input stream from RFile, will retry", e);
+  return status;
 }
   }
 
   protected Status replicateLogs(ClientContext peerContext, final HostAndPort 
peerTserver, final ReplicationTarget target, final Path p, final Status status,
   final long sizeLimit, final String remoteTableId, final TCredentials 
tcreds, final ReplicaSystemHelper helper, final UserGroupInformation 
accumuloUgi,
   long timeout) throws TTransportException, AccumuloException, 
AccumuloSecurityException {
-
 log.debug("Replication WAL to peer tserver");
 final Set tids;
-final DataInputStream input;
-Span span = Trace.start("Read WAL header");
-span.data("file", p.toString());
-try {
-  input = getWalStream(p);
+try (final DataInputStream input = getWalStream(p)) {
 
 Review comment: