[hadoop] 09/50: HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to support tailing edits via RPC. Contributed by Erik Krogen.
This is an automated email from the ASF dual-hosted git repository. cliang pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/hadoop.git commit f847983f2c621d3de2c2ffbcfc1f1e40d83efa4e Author: Erik Krogen AuthorDate: Tue May 22 16:45:26 2018 -0700 HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to support tailing edits via RPC. Contributed by Erik Krogen. --- .../hadoop/hdfs/qjournal/client/AsyncLogger.java | 7 ++ .../hdfs/qjournal/client/AsyncLoggerSet.java | 14 +++ .../hdfs/qjournal/client/IPCLoggerChannel.java | 14 +++ .../hdfs/qjournal/client/QuorumJournalManager.java | 111 +- .../server/namenode/EditLogFileInputStream.java| 44 +++ .../hdfs/server/namenode/ha/EditLogTailer.java | 6 +- .../src/main/resources/hdfs-default.xml| 4 +- .../qjournal/client/TestQuorumJournalManager.java | 130 + .../client/TestQuorumJournalManagerUnit.java | 101 +++- .../namenode/TestEditLogFileInputStream.java | 18 +++ 10 files changed, 439 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java index d2b48cc..7230ebc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java @@ -22,6 +22,7 @@ import java.net.URL; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; @@ -106,6 +107,12 @@ interface AsyncLogger { * Begin a new epoch on the target node. */ public ListenableFuture newEpoch(long epoch); + + /** + * Fetch journaled edits from the cache. + */ + public ListenableFuture getJournaledEdits( + long fromTxnId, int maxTransactions); /** * Fetch the list of edit logs available on the remote node. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java index d46c2cf..15e1df6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; @@ -261,6 +262,19 @@ class AsyncLoggerSet { return QuorumCall.create(calls); } + public QuorumCall + getJournaledEdits(long fromTxnId, int maxTransactions) { +Map> calls += Maps.newHashMap(); +for (AsyncLogger logger : loggers) { + ListenableFuture future = + logger.getJournaledEdits(fromTxnId, maxTransactions); + calls.put(logger, future); +} +return QuorumCall.create(calls); + } + public QuorumCall getEditLogManifest( long fromTxnId, boolean inProgressOk) { Map getJournaledEdits( + long fromTxnId, int maxTransactions) { +return parallelExecutor.submit( +new Callable() { + @Override + public GetJournaledEditsResponseProto call() throws IOException { +return getProxy().getJournaledEdits(journalId, nameServiceId, +fromTxnId, maxTransactions); + } +}); + } + + @Override public ListenableFuture getEditLogManifest( final long fromTxnId, final boolean inProgressOk) { return parallelExecutor.submit(new Callable() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
[hadoop] 09/50: HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to support tailing edits via RPC. Contributed by Erik Krogen.
This is an automated email from the ASF dual-hosted git repository. cliang pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/hadoop.git commit ac3b3cd82b3ed6403828c4f0864f9110215f Author: Erik Krogen AuthorDate: Tue May 22 16:45:26 2018 -0700 HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to support tailing edits via RPC. Contributed by Erik Krogen. --- .../hadoop/hdfs/qjournal/client/AsyncLogger.java | 7 ++ .../hdfs/qjournal/client/AsyncLoggerSet.java | 14 +++ .../hdfs/qjournal/client/IPCLoggerChannel.java | 14 +++ .../hdfs/qjournal/client/QuorumJournalManager.java | 111 +- .../server/namenode/EditLogFileInputStream.java| 44 +++ .../hdfs/server/namenode/ha/EditLogTailer.java | 6 +- .../src/main/resources/hdfs-default.xml| 4 +- .../qjournal/client/TestQuorumJournalManager.java | 130 + .../client/TestQuorumJournalManagerUnit.java | 101 +++- .../namenode/TestEditLogFileInputStream.java | 18 +++ 10 files changed, 439 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java index 2633723..5eead67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java @@ -22,6 +22,7 @@ import java.net.URL; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; @@ -107,6 +108,12 @@ interface AsyncLogger { * Begin a new epoch on the target node. */ public ListenableFuture newEpoch(long epoch); + + /** + * Fetch journaled edits from the cache. + */ + public ListenableFuture getJournaledEdits( + long fromTxnId, int maxTransactions); /** * Fetch the list of edit logs available on the remote node. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java index 6302b2a..f024b0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; @@ -261,6 +262,19 @@ class AsyncLoggerSet { return QuorumCall.create(calls); } + public QuorumCall + getJournaledEdits(long fromTxnId, int maxTransactions) { +Map> calls += Maps.newHashMap(); +for (AsyncLogger logger : loggers) { + ListenableFuture future = + logger.getJournaledEdits(fromTxnId, maxTransactions); + calls.put(logger, future); +} +return QuorumCall.create(calls); + } + public QuorumCall getEditLogManifest( long fromTxnId, boolean inProgressOk) { Map getJournaledEdits( + long fromTxnId, int maxTransactions) { +return parallelExecutor.submit( +new Callable() { + @Override + public GetJournaledEditsResponseProto call() throws IOException { +return getProxy().getJournaledEdits(journalId, nameServiceId, +fromTxnId, maxTransactions); + } +}); + } + + @Override public ListenableFuture getEditLogManifest( final long fromTxnId, final boolean inProgressOk) { return parallelExecutor.submit(new Callable() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index