adamjshook closed pull request #369: [ACCUMULO-4787] Close input stream in AccumuloReplicaSystem URL: https://github.com/apache/accumulo/pull/369
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 395e18fba2..f2d36fd44c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -337,8 +337,7 @@ public DfsLogger(ServerResources conf, String filename, String meta) throws IOEx metaReference = meta; } - public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException { - FSDataInputStream input = fs.open(path); + public static DFSLoggerInputStreams readHeaderAndReturnStream(FSDataInputStream input, AccumuloConfiguration conf) throws IOException { DataInputStream decryptingInput = null; byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes(UTF_8); @@ -414,7 +413,7 @@ public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, } } catch (EOFException e) { - log.warn("Got EOFException trying to read WAL header information, assuming the rest of the file (" + path + ") has no data."); + log.warn("Got EOFException trying to read WAL header information, assuming the rest of the file has no data."); // A TabletServer might have died before the (complete) header was written throw new LogHeaderIncompleteException(e); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 11097ceaea..ba5e488226 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -113,44 +113,46 @@ public void sort(String name, Path srcPath, String destPath) { // the following call does not throw an exception if the file/dir does not exist fs.deleteRecursively(new Path(destPath)); - DFSLoggerInputStreams inputStreams; - try { - inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf); - } catch (LogHeaderIncompleteException e) { - log.warn("Could not read header from write-ahead log " + srcPath + ". Not sorting."); - // Creating a 'finished' marker will cause recovery to proceed normally and the - // empty file will be correctly ignored downstream. - fs.mkdirs(new Path(destPath)); - writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> emptyList(), part++); - fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close(); - return; - } + try (final FSDataInputStream fsinput = fs.open(srcPath)) { + DFSLoggerInputStreams inputStreams; + try { + inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf); + } catch (LogHeaderIncompleteException e) { + log.warn("Could not read header from write-ahead log " + srcPath + ". Not sorting."); + // Creating a 'finished' marker will cause recovery to proceed normally and the + // empty file will be correctly ignored downstream. + fs.mkdirs(new Path(destPath)); + writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> emptyList(), part++); + fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close(); + return; + } - this.input = inputStreams.getOriginalInput(); - this.decryptingInput = inputStreams.getDecryptingInputStream(); + this.input = inputStreams.getOriginalInput(); + this.decryptingInput = inputStreams.getDecryptingInputStream(); - final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE); - Thread.currentThread().setName("Sorting " + name + " for recovery"); - while (true) { - final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>(); - try { - long start = input.getPos(); - while (input.getPos() - start < bufferSize) { - LogFileKey key = new LogFileKey(); - LogFileValue value = new LogFileValue(); - key.readFields(decryptingInput); - value.readFields(decryptingInput); - buffer.add(new Pair<>(key, value)); + final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE); + Thread.currentThread().setName("Sorting " + name + " for recovery"); + while (true) { + final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>(); + try { + long start = input.getPos(); + while (input.getPos() - start < bufferSize) { + LogFileKey key = new LogFileKey(); + LogFileValue value = new LogFileValue(); + key.readFields(decryptingInput); + value.readFields(decryptingInput); + buffer.add(new Pair<>(key, value)); + } + writeBuffer(destPath, buffer, part++); + buffer.clear(); + } catch (EOFException ex) { + writeBuffer(destPath, buffer, part++); + break; } - writeBuffer(destPath, buffer, part++); - buffer.clear(); - } catch (EOFException ex) { - writeBuffer(destPath, buffer, part++); - break; } + fs.create(new Path(destPath, "finished")).close(); + log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms"); } - fs.create(new Path(destPath, "finished")).close(); - log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms"); } catch (Throwable t) { try { // parent dir may not exist diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java index d5a4db9bda..fb286c4ccf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java @@ -38,6 +38,7 @@ import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams; import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException; import org.apache.accumulo.tserver.log.MultiReader; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -100,28 +101,30 @@ public static void main(String[] args) throws IOException { LogFileValue value = new LogFileValue(); if (fs.isFile(path)) { - // read log entries from a simple hdfs file - DFSLoggerInputStreams streams; - try { - streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getInstance()); - } catch (LogHeaderIncompleteException e) { - log.warn("Could not read header for " + path + ". Ignoring..."); - continue; - } - DataInputStream input = streams.getDecryptingInputStream(); - - try { - while (true) { - try { - key.readFields(input); - value.readFields(input); - } catch (EOFException ex) { - break; + try (final FSDataInputStream fsinput = fs.open(path)) { + // read log entries from a simple hdfs file + DFSLoggerInputStreams streams; + try { + streams = DfsLogger.readHeaderAndReturnStream(fsinput, SiteConfiguration.getInstance()); + } catch (LogHeaderIncompleteException e) { + log.warn("Could not read header for " + path + ". Ignoring..."); + continue; + } + DataInputStream input = streams.getDecryptingInputStream(); + + try { + while (true) { + try { + key.readFields(input); + value.readFields(input); + } catch (EOFException ex) { + break; + } + printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations); } - printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations); + } finally { + input.close(); } - } finally { - input.close(); } } else { // read the log entries sorted in a map file diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java index 24b7756d22..97c8b7eab6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java @@ -77,6 +77,7 @@ import org.apache.accumulo.tserver.logger.LogFileKey; import org.apache.accumulo.tserver.logger.LogFileValue; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.transport.TTransportException; @@ -321,46 +322,43 @@ public String execute(ReplicationCoordinator.Client client) throws Exception { protected Status replicateRFiles(ClientContext peerContext, final HostAndPort peerTserver, final ReplicationTarget target, final Path p, final Status status, final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper, long timeout) throws TTransportException, AccumuloException, AccumuloSecurityException { - DataInputStream input; - try { - input = getRFileInputStream(p); - } catch (IOException e) { - log.error("Could not create input stream from RFile, will retry", e); - return status; - } - - Status lastStatus = status, currentStatus = status; - while (true) { - // Read and send a batch of mutations - ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new RFileClientExecReturn(target, input, p, - currentStatus, sizeLimit, remoteTableId, tcreds), timeout); + try (final DataInputStream input = getRFileInputStream(p)) { + Status lastStatus = status, currentStatus = status; + while (true) { + // Read and send a batch of mutations + ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new RFileClientExecReturn(target, input, p, + currentStatus, sizeLimit, remoteTableId, tcreds), timeout); - // Catch the overflow - long newBegin = currentStatus.getBegin() + replResult.entriesConsumed; - if (newBegin < 0) { - newBegin = Long.MAX_VALUE; - } + // Catch the overflow + long newBegin = currentStatus.getBegin() + replResult.entriesConsumed; + if (newBegin < 0) { + newBegin = Long.MAX_VALUE; + } - currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build(); + currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build(); - log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus)); + log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus)); - // If we got a different status - if (!currentStatus.equals(lastStatus)) { - // If we don't have any more work, just quit - if (!StatusUtil.isWorkRequired(currentStatus)) { - return currentStatus; + // If we got a different status + if (!currentStatus.equals(lastStatus)) { + // If we don't have any more work, just quit + if (!StatusUtil.isWorkRequired(currentStatus)) { + return currentStatus; + } else { + // Otherwise, let it loop and replicate some more data + lastStatus = currentStatus; + } } else { - // Otherwise, let it loop and replicate some more data - lastStatus = currentStatus; - } - } else { - log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus)); + log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus)); - // otherwise, we didn't actually replicate (likely because there was error sending the data) - // we can just not record any updates, and it will be picked up again by the work assigner - return status; + // otherwise, we didn't actually replicate (likely because there was error sending the data) + // we can just not record any updates, and it will be picked up again by the work assigner + return status; + } } + } catch (IOException e) { + log.error("Could not create input stream from RFile, will retry", e); + return status; } } @@ -370,11 +368,112 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer log.debug("Replication WAL to peer tserver"); final Set<Integer> tids; - final DataInputStream input; - Span span = Trace.start("Read WAL header"); - span.data("file", p.toString()); - try { - input = getWalStream(p); + try (final FSDataInputStream fsinput = fs.open(p); final DataInputStream input = getWalStream(p, fsinput)) { + log.debug("Skipping unwanted data in WAL"); + Span span = Trace.start("Consume WAL prefix"); + span.data("file", p.toString()); + try { + // We want to read all records in the WAL up to the "begin" offset contained in the Status message, + // building a Set of tids from DEFINE_TABLET events which correspond to table ids for future mutations + tids = consumeWalPrefix(target, input, p, status, sizeLimit); + } catch (IOException e) { + log.warn("Unexpected error consuming file."); + return status; + } finally { + span.stop(); + } + + log.debug("Sending batches of data to peer tserver"); + + Status lastStatus = status, currentStatus = status; + final AtomicReference<Exception> exceptionRef = new AtomicReference<>(); + while (true) { + // Set some trace info + span = Trace.start("Replicate WAL batch"); + span.data("Batch size (bytes)", Long.toString(sizeLimit)); + span.data("File", p.toString()); + span.data("Peer instance name", peerContext.getInstance().getInstanceName()); + span.data("Peer tserver", peerTserver.toString()); + span.data("Remote table ID", remoteTableId); + + ReplicationStats replResult; + try { + // Read and send a batch of mutations + replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, + sizeLimit, remoteTableId, tcreds, tids), timeout); + } catch (Exception e) { + log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(), peerTserver, e); + throw e; + } finally { + span.stop(); + } + + // Catch the overflow + long newBegin = currentStatus.getBegin() + replResult.entriesConsumed; + if (newBegin < 0) { + newBegin = Long.MAX_VALUE; + } + + currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build(); + + log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus)); + + // If we got a different status + if (!currentStatus.equals(lastStatus)) { + span = Trace.start("Update replication table"); + try { + if (null != accumuloUgi) { + final Status copy = currentStatus; + accumuloUgi.doAs(new PrivilegedAction<Void>() { + @Override + public Void run() { + try { + helper.recordNewStatus(p, copy, target); + } catch (Exception e) { + exceptionRef.set(e); + } + return null; + } + }); + Exception e = exceptionRef.get(); + if (null != e) { + if (e instanceof TableNotFoundException) { + throw (TableNotFoundException) e; + } else if (e instanceof AccumuloSecurityException) { + throw (AccumuloSecurityException) e; + } else if (e instanceof AccumuloException) { + throw (AccumuloException) e; + } else { + throw new RuntimeException("Received unexpected exception", e); + } + } + } else { + helper.recordNewStatus(p, currentStatus, target); + } + } catch (TableNotFoundException e) { + log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e); + throw new RuntimeException("Replication table did not exist, will retry", e); + } finally { + span.stop(); + } + + log.debug("Recorded updated status for {}: {}", p, ProtobufUtil.toString(currentStatus)); + + // If we don't have any more work, just quit + if (!StatusUtil.isWorkRequired(currentStatus)) { + return currentStatus; + } else { + // Otherwise, let it loop and replicate some more data + lastStatus = currentStatus; + } + } else { + log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus)); + + // otherwise, we didn't actually replicate (likely because there was error sending the data) + // we can just not record any updates, and it will be picked up again by the work assigner + return status; + } + } } catch (LogHeaderIncompleteException e) { log.warn("Could not read header from {}, assuming that there is no data present in the WAL, therefore replication is complete", p); Status newStatus; @@ -384,7 +483,7 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer } else { newStatus = Status.newBuilder(status).setBegin(status.getEnd()).build(); } - span = Trace.start("Update replication table"); + Span span = Trace.start("Update replication table"); try { helper.recordNewStatus(p, newStatus, target); } catch (TableNotFoundException tnfe) { @@ -398,114 +497,6 @@ protected Status replicateLogs(ClientContext peerContext, final HostAndPort peer log.error("Could not create stream for WAL", e); // No data sent (bytes nor records) and no progress made return status; - } finally { - span.stop(); - } - - log.debug("Skipping unwanted data in WAL"); - span = Trace.start("Consume WAL prefix"); - span.data("file", p.toString()); - try { - // We want to read all records in the WAL up to the "begin" offset contained in the Status message, - // building a Set of tids from DEFINE_TABLET events which correspond to table ids for future mutations - tids = consumeWalPrefix(target, input, p, status, sizeLimit); - } catch (IOException e) { - log.warn("Unexpected error consuming file."); - return status; - } finally { - span.stop(); - } - - log.debug("Sending batches of data to peer tserver"); - - Status lastStatus = status, currentStatus = status; - final AtomicReference<Exception> exceptionRef = new AtomicReference<>(); - while (true) { - // Set some trace info - span = Trace.start("Replicate WAL batch"); - span.data("Batch size (bytes)", Long.toString(sizeLimit)); - span.data("File", p.toString()); - span.data("Peer instance name", peerContext.getInstance().getInstanceName()); - span.data("Peer tserver", peerTserver.toString()); - span.data("Remote table ID", remoteTableId); - - ReplicationStats replResult; - try { - // Read and send a batch of mutations - replResult = ReplicationClient.executeServicerWithReturn(peerContext, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, - remoteTableId, tcreds, tids), timeout); - } catch (Exception e) { - log.error("Caught exception replicating data to {} at {}", peerContext.getInstance().getInstanceName(), peerTserver, e); - throw e; - } finally { - span.stop(); - } - - // Catch the overflow - long newBegin = currentStatus.getBegin() + replResult.entriesConsumed; - if (newBegin < 0) { - newBegin = Long.MAX_VALUE; - } - - currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build(); - - log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus)); - - // If we got a different status - if (!currentStatus.equals(lastStatus)) { - span = Trace.start("Update replication table"); - try { - if (null != accumuloUgi) { - final Status copy = currentStatus; - accumuloUgi.doAs(new PrivilegedAction<Void>() { - @Override - public Void run() { - try { - helper.recordNewStatus(p, copy, target); - } catch (Exception e) { - exceptionRef.set(e); - } - return null; - } - }); - Exception e = exceptionRef.get(); - if (null != e) { - if (e instanceof TableNotFoundException) { - throw (TableNotFoundException) e; - } else if (e instanceof AccumuloSecurityException) { - throw (AccumuloSecurityException) e; - } else if (e instanceof AccumuloException) { - throw (AccumuloException) e; - } else { - throw new RuntimeException("Received unexpected exception", e); - } - } - } else { - helper.recordNewStatus(p, currentStatus, target); - } - } catch (TableNotFoundException e) { - log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e); - throw new RuntimeException("Replication table did not exist, will retry", e); - } finally { - span.stop(); - } - - log.debug("Recorded updated status for {}: {}", p, ProtobufUtil.toString(currentStatus)); - - // If we don't have any more work, just quit - if (!StatusUtil.isWorkRequired(currentStatus)) { - return currentStatus; - } else { - // Otherwise, let it loop and replicate some more data - lastStatus = currentStatus; - } - } else { - log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus)); - - // otherwise, we didn't actually replicate (likely because there was error sending the data) - // we can just not record any updates, and it will be picked up again by the work assigner - return status; - } } } @@ -687,9 +678,15 @@ protected RFileReplication getKeyValues(ReplicationTarget target, DataInputStrea return tids; } - public DataInputStream getWalStream(Path p) throws IOException { - DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf); - return streams.getDecryptingInputStream(); + public DataInputStream getWalStream(Path p, FSDataInputStream input) throws IOException { + Span span = Trace.start("Read WAL header"); + span.data("file", p.toString()); + try { + DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(input, conf); + return streams.getDecryptingInputStream(); + } finally { + span.stop(); + } } protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit, Set<Integer> desiredTids) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services