saintstack commented on a change in pull request #2364:
URL: https://github.com/apache/hbase/pull/2364#discussion_r485115878



##########
File path: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
##########
@@ -979,6 +979,8 @@
   /*
    * cluster replication constants.
    */
+  public static final String REPLICATION_OFFLOAD_ENABLE_KEY = 
"hbase.replication.offload.enabled";
+  public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;

Review comment:
       Only used in ReplicationSourceManager so define these there?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;
+
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used to control all replication sources inside one RegionServer or 
ReplicationServer.
+ * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceOverallController {

Review comment:
       Can it just be ReplicationSoruceController? Drop the 'Overall'?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -244,7 +286,7 @@ private void adoptAbandonedQueues() {
    * </ol>
    * @param peerId the id of replication peer
    */
-  public void addPeer(String peerId) throws IOException {
+  void addPeer(String peerId) throws IOException {

Review comment:
       Funny, I want it to stay public in my current work (meta region 
replicas). Can move it back in my patch.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -268,7 +310,7 @@ public void addPeer(String peerId) throws IOException {
    * </ol>
    * @param peerId the id of the replication peer
    */
-  public void removePeer(String peerId) {
+  void removePeer(String peerId) {

Review comment:
       Ditto

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -186,12 +188,11 @@ public ReplicationSourceManager(ReplicationQueueStorage 
queueStorage,
     this.latestPaths = new HashMap<>();
     this.replicationForBulkLoadDataEnabled = conf.getBoolean(
       HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
-    this.sleepForRetries = 
this.conf.getLong("replication.source.sync.sleepforretries", 1000);
-    this.maxRetriesMultiplier =
-      this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
     this.totalBufferLimit = 
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
         HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     this.globalMetrics = globalMetrics;
+    this.replicationOffload = 
conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,

Review comment:
       I should go back to the design but we can't just have the offload be a 
ReplicationSource implementation? Because you need to span all 
ReplicationSources?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -92,7 +93,8 @@
  * </ul>
  */
 @InterfaceAudience.Private
-public class ReplicationSourceManager implements ReplicationListener {
+public class ReplicationSourceManager implements ReplicationListener,
+  ReplicationSourceOverallController {

Review comment:
       Yeah, if this implements Controller, you have to pass it in to 
ReplicationSource #Init as a distinct arg?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;
+
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used to control all replication sources inside one RegionServer or 
ReplicationServer.
+ * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceOverallController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are 
pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();
+
+  AtomicLong getTotalBufferUsed();
+
+  MetricsReplicationGlobalSourceSource getGlobalMetrics();

Review comment:
       What sort of 'global' metrics? This is metrics for all replication 
sources?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;

Review comment:
       License

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;
+
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used to control all replication sources inside one RegionServer or 
ReplicationServer.
+ * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceOverallController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are 
pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();

Review comment:
       Is this a 'size' rather than a 'limit'? It is count of how many bytes 
are currently accumulated in replication source memory?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
##########
@@ -45,18 +46,15 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RecoveredReplicationSource.class);
 
-  private Path walDir;
-
   private String actualPeerId;
 
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, 
ReplicationSourceManager manager,
-    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-    String peerClusterZnode, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
-    MetricsSource metrics) throws IOException {
-    super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, 
server, peerClusterZnode,
-      clusterId, walFileLengthProvider, metrics);
-    this.walDir = walDir;
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceOverallController overallController, 
ReplicationQueueStorage queueStorage,

Review comment:
       There is only one ReplicationSourceManager instance in a RS? If so, can 
it carry the Controller rather than as here where it is another param on this 
init method?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
##########
@@ -267,10 +267,11 @@ public Path getCurrentPath() {
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
     // try not to go over total quota
-    if (source.manager.getTotalBufferUsed().get() > 
source.manager.getTotalBufferLimit()) {
+    if (source.overallController.getTotalBufferUsed().get() > 
source.overallController

Review comment:
       Yeah, if the Controller is in the ReplicationSourceManager, then the 
'overall' will be redundant?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;
+
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used to control all replication sources inside one RegionServer or 
ReplicationServer.
+ * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceOverallController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are 
pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();
+
+  AtomicLong getTotalBufferUsed();
+
+  MetricsReplicationGlobalSourceSource getGlobalMetrics();
+
+  /**
+   * Called this when the recovered replication source replicated all WALs.
+   * @param src

Review comment:
       Remove '@param src'. s/Called/Call/

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -849,19 +874,6 @@ int getSizeOfLatestPath() {
     }
   }
 
-  @VisibleForTesting
-  public AtomicLong getTotalBufferUsed() {

Review comment:
       I see, you are just moving existing methods.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -244,7 +286,7 @@ private void adoptAbandonedQueues() {
    * </ol>
    * @param peerId the id of replication peer
    */
-  public void addPeer(String peerId) throws IOException {
+  void addPeer(String peerId) throws IOException {

Review comment:
       I want to call it on Region open. Let me see if I can move where I make 
the call.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceOverallController.java
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.replication;
+
+import 
org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import 
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Used to control all replication sources inside one RegionServer or 
ReplicationServer.
+ * Used by {@link ReplicationSource} or {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public interface ReplicationSourceOverallController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are 
pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();

Review comment:
       Maybe the comment is just in the wrong place.. should be on the next 
data member?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to