keith-turner commented on a change in pull request #369: [ACCUMULO-4787] Close
input stream in AccumuloReplicaSystem
URL: https://github.com/apache/accumulo/pull/369#discussion_r167342746
##
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
##
@@ -321,60 +321,67 @@ public String execute(ReplicationCoordinator.Client
client) throws Exception {
protected Status replicateRFiles(ClientContext peerContext, final
HostAndPort peerTserver, final ReplicationTarget target, final Path p, final
Status status,
final long sizeLimit, final String remoteTableId, final TCredentials
tcreds, final ReplicaSystemHelper helper, long timeout) throws
TTransportException,
AccumuloException, AccumuloSecurityException {
-DataInputStream input;
-try {
- input = getRFileInputStream(p);
-} catch (IOException e) {
- log.error("Could not create input stream from RFile, will retry", e);
- return status;
-}
-
-Status lastStatus = status, currentStatus = status;
-while (true) {
- // Read and send a batch of mutations
- ReplicationStats replResult =
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new
RFileClientExecReturn(target, input, p,
- currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
+try (final DataInputStream input = getRFileInputStream(p)) {
+ Status lastStatus = status, currentStatus = status;
+ while (true) {
+// Read and send a batch of mutations
+ReplicationStats replResult =
ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new
RFileClientExecReturn(target, input, p,
+currentStatus, sizeLimit, remoteTableId, tcreds), timeout);
- // Catch the overflow
- long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
- if (newBegin < 0) {
-newBegin = Long.MAX_VALUE;
- }
+// Catch the overflow
+long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+if (newBegin < 0) {
+ newBegin = Long.MAX_VALUE;
+}
- currentStatus =
Status.newBuilder(currentStatus).setBegin(newBegin).build();
+currentStatus =
Status.newBuilder(currentStatus).setBegin(newBegin).build();
- log.debug("Sent batch for replication of {} to {}, with new Status {}",
p, target, ProtobufUtil.toString(currentStatus));
+log.debug("Sent batch for replication of {} to {}, with new Status
{}", p, target, ProtobufUtil.toString(currentStatus));
- // If we got a different status
- if (!currentStatus.equals(lastStatus)) {
-// If we don't have any more work, just quit
-if (!StatusUtil.isWorkRequired(currentStatus)) {
- return currentStatus;
+// If we got a different status
+if (!currentStatus.equals(lastStatus)) {
+ // If we don't have any more work, just quit
+ if (!StatusUtil.isWorkRequired(currentStatus)) {
+return currentStatus;
+ } else {
+// Otherwise, let it loop and replicate some more data
+lastStatus = currentStatus;
+ }
} else {
- // Otherwise, let it loop and replicate some more data
- lastStatus = currentStatus;
-}
- } else {
-log.debug("Did not replicate any new data for {} to {}, (state was
{})", p, target, ProtobufUtil.toString(lastStatus));
+ log.debug("Did not replicate any new data for {} to {}, (state was
{})", p, target, ProtobufUtil.toString(lastStatus));
-// otherwise, we didn't actually replicate (likely because there was
error sending the data)
-// we can just not record any updates, and it will be picked up again
by the work assigner
-return status;
+ // otherwise, we didn't actually replicate (likely because there was
error sending the data)
+ // we can just not record any updates, and it will be picked up
again by the work assigner
+ return status;
+}
}
+} catch (IOException e) {
+ log.error("Could not create input stream from RFile, will retry", e);
+ return status;
}
}
protected Status replicateLogs(ClientContext peerContext, final HostAndPort
peerTserver, final ReplicationTarget target, final Path p, final Status status,
final long sizeLimit, final String remoteTableId, final TCredentials
tcreds, final ReplicaSystemHelper helper, final UserGroupInformation
accumuloUgi,
long timeout) throws TTransportException, AccumuloException,
AccumuloSecurityException {
-
log.debug("Replication WAL to peer tserver");
final Set tids;
-final DataInputStream input;
-Span span = Trace.start("Read WAL header");
-span.data("file", p.toString());
-try {
- input = getWalStream(p);
+try (final DataInputStream input = getWalStream(p)) {
Review comment: