This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 0f82a447585 HBASE-28155 RecoveredReplicationSource quit when there are 
still unfinished groups (#5466)
0f82a447585 is described below

commit 0f82a447585b33d93dbc00a6f5da3dcdd1a5c571
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Fri Oct 20 11:58:28 2023 +0800

    HBASE-28155 RecoveredReplicationSource quit when there are still unfinished 
groups (#5466)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
    (cherry picked from commit dde504ce489fd3fd55166a872768a077400ba2ab)
---
 .../regionserver/RecoveredReplicationSource.java   | 16 ++++++++++++
 .../regionserver/ReplicationSource.java            | 29 ++++++++++++++++------
 2 files changed, 37 insertions(+), 8 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index e9062472221..e47df36e3aa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -26,6 +26,22 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class RecoveredReplicationSource extends ReplicationSource {
 
+  @Override
+  protected void startShippers() {
+    for (String walGroupId : logQueue.getQueues().keySet()) {
+      workerThreads.put(walGroupId, createNewShipper(walGroupId));
+    }
+    // start shippers after initializing the workerThreads, as in the below 
postFinish logic, if
+    // workerThreads is empty, we will mark the RecoveredReplicationSource as 
finished. So if we
+    // start the worker on the fly, it is possible that a shipper has already 
finished its work and
+    // called postFinish, and find out the workerThreads is empty and then 
mark the
+    // RecoveredReplicationSource as finish, while the next shipper has not 
been added to
+    // workerThreads yet. See HBASE-28155 for more details.
+    for (ReplicationSourceShipper shipper : workerThreads.values()) {
+      startShipper(shipper);
+    }
+  }
+
   @Override
   protected RecoveredReplicationSourceShipper createNewShipper(String 
walGroupId,
     ReplicationSourceWALReader walReader) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 4c864e5e450..094fa4aaa78 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -360,6 +360,19 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     }
   }
 
+  protected final ReplicationSourceShipper createNewShipper(String walGroupId) 
{
+    ReplicationSourceWALReader walReader =
+      createNewWALReader(walGroupId, getStartOffset(walGroupId));
+    ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
+    Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
+      + ".replicationSource.wal-reader." + walGroupId + "," + queueId, 
this::retryRefreshing);
+    return worker;
+  }
+
+  protected final void startShipper(ReplicationSourceShipper worker) {
+    worker.startup(this::retryRefreshing);
+  }
+
   private void tryStartNewShipper(String walGroupId) {
     workerThreads.compute(walGroupId, (key, value) -> {
       if (value != null) {
@@ -367,12 +380,8 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
         return value;
       } else {
         LOG.debug("{} starting shipping worker for walGroupId={}", 
logPeerId(), walGroupId);
-        ReplicationSourceWALReader walReader =
-          createNewWALReader(walGroupId, getStartOffset(walGroupId));
-        ReplicationSourceShipper worker = createNewShipper(walGroupId, 
walReader);
-        Threads.setDaemonThreadRunning(walReader, 
Thread.currentThread().getName()
-          + ".replicationSource.wal-reader." + walGroupId + "," + queueId, 
this::retryRefreshing);
-        worker.startup(this::retryRefreshing);
+        ReplicationSourceShipper worker = createNewShipper(walGroupId);
+        startShipper(worker);
         return worker;
       }
     });
@@ -522,7 +531,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
    * @param sleepMultiplier by how many times the default sleeping time is 
augmented
    * @return True if <code>sleepMultiplier</code> is &lt; 
<code>maxRetriesMultiplier</code>
    */
-  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
+  private boolean sleepForRetries(String msg, int sleepMultiplier) {
     try {
       if (LOG.isTraceEnabled()) {
         LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, 
sleepForRetries,
@@ -605,10 +614,14 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
       queueId, logQueue.getNumQueues(), clusterId, peerClusterId);
     initializeWALEntryFilter(peerClusterId);
     // Start workers
+    startShippers();
+    setSourceStartupStatus(false);
+  }
+
+  protected void startShippers() {
     for (String walGroupId : logQueue.getQueues().keySet()) {
       tryStartNewShipper(walGroupId);
     }
-    setSourceStartupStatus(false);
   }
 
   private synchronized void setSourceStartupStatus(boolean initializing) {

Reply via email to