[hadoop] 09/50: HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to support tailing edits via RPC. Contributed by Erik Krogen.

2019-07-25 Thread cliang
This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit f847983f2c621d3de2c2ffbcfc1f1e40d83efa4e
Author: Erik Krogen 
AuthorDate: Tue May 22 16:45:26 2018 -0700

HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to 
support tailing edits via RPC. Contributed by Erik Krogen.
---
 .../hadoop/hdfs/qjournal/client/AsyncLogger.java   |   7 ++
 .../hdfs/qjournal/client/AsyncLoggerSet.java   |  14 +++
 .../hdfs/qjournal/client/IPCLoggerChannel.java |  14 +++
 .../hdfs/qjournal/client/QuorumJournalManager.java | 111 +-
 .../server/namenode/EditLogFileInputStream.java|  44 +++
 .../hdfs/server/namenode/ha/EditLogTailer.java |   6 +-
 .../src/main/resources/hdfs-default.xml|   4 +-
 .../qjournal/client/TestQuorumJournalManager.java  | 130 +
 .../client/TestQuorumJournalManagerUnit.java   | 101 +++-
 .../namenode/TestEditLogFileInputStream.java   |  18 +++
 10 files changed, 439 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
index d2b48cc..7230ebc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
@@ -22,6 +22,7 @@ import java.net.URL;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -106,6 +107,12 @@ interface AsyncLogger {
* Begin a new epoch on the target node.
*/
   public ListenableFuture newEpoch(long epoch);
+
+  /**
+   * Fetch journaled edits from the cache.
+   */
+  public ListenableFuture getJournaledEdits(
+  long fromTxnId, int maxTransactions);
   
   /**
* Fetch the list of edit logs available on the remote node.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
index d46c2cf..15e1df6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -261,6 +262,19 @@ class AsyncLoggerSet {
 return QuorumCall.create(calls);
   }
 
+  public QuorumCall
+  getJournaledEdits(long fromTxnId, int maxTransactions) {
+Map> calls
+= Maps.newHashMap();
+for (AsyncLogger logger : loggers) {
+  ListenableFuture future =
+  logger.getJournaledEdits(fromTxnId, maxTransactions);
+  calls.put(logger, future);
+}
+return QuorumCall.create(calls);
+  }
+
   public QuorumCall getEditLogManifest(
   long fromTxnId, boolean inProgressOk) {
 Map getJournaledEdits(
+  long fromTxnId, int maxTransactions) {
+return parallelExecutor.submit(
+new Callable() {
+  @Override
+  public GetJournaledEditsResponseProto call() throws IOException {
+return getProxy().getJournaledEdits(journalId, nameServiceId,
+fromTxnId, maxTransactions);
+  }
+});
+  }
+
+  @Override
   public ListenableFuture getEditLogManifest(
   final long fromTxnId, final boolean inProgressOk) {
 return parallelExecutor.submit(new Callable() {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
 

[hadoop] 09/50: HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to support tailing edits via RPC. Contributed by Erik Krogen.

2019-06-28 Thread cliang
This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ac3b3cd82b3ed6403828c4f0864f9110215f
Author: Erik Krogen 
AuthorDate: Tue May 22 16:45:26 2018 -0700

HDFS-13609. [SBN read] Edit Tail Fast Path Part 3: NameNode-side changes to 
support tailing edits via RPC. Contributed by Erik Krogen.
---
 .../hadoop/hdfs/qjournal/client/AsyncLogger.java   |   7 ++
 .../hdfs/qjournal/client/AsyncLoggerSet.java   |  14 +++
 .../hdfs/qjournal/client/IPCLoggerChannel.java |  14 +++
 .../hdfs/qjournal/client/QuorumJournalManager.java | 111 +-
 .../server/namenode/EditLogFileInputStream.java|  44 +++
 .../hdfs/server/namenode/ha/EditLogTailer.java |   6 +-
 .../src/main/resources/hdfs-default.xml|   4 +-
 .../qjournal/client/TestQuorumJournalManager.java  | 130 +
 .../client/TestQuorumJournalManagerUnit.java   | 101 +++-
 .../namenode/TestEditLogFileInputStream.java   |  18 +++
 10 files changed, 439 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
index 2633723..5eead67 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
@@ -22,6 +22,7 @@ import java.net.URL;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -107,6 +108,12 @@ interface AsyncLogger {
* Begin a new epoch on the target node.
*/
   public ListenableFuture newEpoch(long epoch);
+
+  /**
+   * Fetch journaled edits from the cache.
+   */
+  public ListenableFuture getJournaledEdits(
+  long fromTxnId, int maxTransactions);
   
   /**
* Fetch the list of edit logs available on the remote node.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
index 6302b2a..f024b0e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import 
org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -261,6 +262,19 @@ class AsyncLoggerSet {
 return QuorumCall.create(calls);
   }
 
+  public QuorumCall
+  getJournaledEdits(long fromTxnId, int maxTransactions) {
+Map> calls
+= Maps.newHashMap();
+for (AsyncLogger logger : loggers) {
+  ListenableFuture future =
+  logger.getJournaledEdits(fromTxnId, maxTransactions);
+  calls.put(logger, future);
+}
+return QuorumCall.create(calls);
+  }
+
   public QuorumCall getEditLogManifest(
   long fromTxnId, boolean inProgressOk) {
 Map getJournaledEdits(
+  long fromTxnId, int maxTransactions) {
+return parallelExecutor.submit(
+new Callable() {
+  @Override
+  public GetJournaledEditsResponseProto call() throws IOException {
+return getProxy().getJournaledEdits(journalId, nameServiceId,
+fromTxnId, maxTransactions);
+  }
+});
+  }
+
+  @Override
   public ListenableFuture getEditLogManifest(
   final long fromTxnId, final boolean inProgressOk) {
 return parallelExecutor.submit(new Callable() {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index