This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new fe2992a3125 HBASE-27806 Support dynamic reinitializing replication 
peer storage (#5195)
fe2992a3125 is described below

commit fe2992a31251ec1490df72773ff1c75bcb398ede
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Sun Apr 23 22:25:31 2023 +0800

    HBASE-27806 Support dynamic reinitializing replication peer storage (#5195)
    
    Signed-off-by: Liangjun He <heliang...@apache.org>
    (cherry picked from commit 18ae733b15ae2bc316f41af1a46e5619d2b35fe2)
---
 .../hbase/replication/ReplicationPeerImpl.java     |  10 +-
 .../hadoop/hbase/replication/ReplicationPeers.java |  33 ++++++-
 .../org/apache/hadoop/hbase/master/HMaster.java    |   8 +-
 .../master/replication/ReplicationPeerManager.java |  30 ++++--
 .../hadoop/hbase/regionserver/HRegionServer.java   |  17 +++-
 .../replication/regionserver/Replication.java      |  25 ++++-
 .../TestMigrateRepliationPeerStorageOnline.java    | 104 +++++++++++++++++++++
 7 files changed, 205 insertions(+), 22 deletions(-)

diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index 1bcc667fcce..2392d620597 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -23,12 +23,13 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public class ReplicationPeerImpl implements ReplicationPeer {
+public class ReplicationPeerImpl implements ReplicationPeer, 
ConfigurationObserver {
 
-  private final Configuration conf;
+  private volatile Configuration conf;
 
   private final String id;
 
@@ -122,4 +123,9 @@ public class ReplicationPeerImpl implements ReplicationPeer 
{
   public void registerPeerConfigListener(ReplicationPeerConfigListener 
listener) {
     this.peerConfigListeners.add(listener);
   }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    this.conf = conf;
+  }
 }
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 3a579298f45..7aa7f89ecf5 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -24,25 +24,38 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This provides an class for maintaining a set of peer clusters. These peers 
are remote slave
  * clusters that data is replicated to.
+ * <p>
+ * We implement {@link ConfigurationObserver} mainly for recreating the
+ * {@link ReplicationPeerStorage}, so we can change the {@link 
ReplicationPeerStorage} without
+ * restarting the region server.
  */
 @InterfaceAudience.Private
-public class ReplicationPeers {
+public class ReplicationPeers implements ConfigurationObserver {
 
-  private final Configuration conf;
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationPeers.class);
+
+  private volatile Configuration conf;
 
   // Map of peer clusters keyed by their id
   private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
-  private final ReplicationPeerStorage peerStorage;
+  private final FileSystem fs;
+  private final ZKWatcher zookeeper;
+  private volatile ReplicationPeerStorage peerStorage;
 
   ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
     this.conf = conf;
+    this.fs = fs;
+    this.zookeeper = zookeeper;
     this.peerCache = new ConcurrentHashMap<>();
     this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, 
zookeeper, conf);
   }
@@ -134,4 +147,18 @@ public class ReplicationPeers {
     return new 
ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, 
conf),
       peerId, enabled, peerConfig);
   }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    this.conf = conf;
+    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, 
zookeeper, conf);
+    for (ReplicationPeerImpl peer : peerCache.values()) {
+      try {
+        peer.onConfigurationChange(
+          ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), 
conf));
+      } catch (ReplicationException e) {
+        LOG.warn("failed to reload configuration for peer {}", peer.getId(), 
e);
+      }
+    }
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 4abd7df915c..b61737020da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -106,7 +106,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
@@ -770,6 +769,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
 
     this.replicationPeerManager =
       ReplicationPeerManager.create(fileSystemManager.getFileSystem(), 
zooKeeper, conf, clusterId);
+    this.configurationManager.registerObserver(replicationPeerManager);
     this.replicationPeerModificationStateStore =
       new ReplicationPeerModificationStateStore(masterRegion);
 
@@ -4235,12 +4235,6 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     }
   }
 
-  @RestrictedApi(explanation = "Should only be called in tests", link = "",
-      allowedOnPath = ".*/src/test/.*")
-  public ConfigurationManager getConfigurationManager() {
-    return configurationManager;
-  }
-
   private void setQuotasObserver(Configuration conf) {
     // Add the Observer to delete quotas on table deletion before starting all 
CPs by
     // default with quota support, avoiding if user specifically asks to not 
load this Observer.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 423f6590dcc..b7c3e0b4984 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -59,11 +60,14 @@ import org.apache.zookeeper.KeeperException;
  * Manages and performs all replication admin operations.
  * <p>
  * Used to add/remove a replication peer.
+ * <p>
+ * Implement {@link ConfigurationObserver} mainly for recreating {@link 
ReplicationPeerStorage}, for
+ * supporting migrating across different replication peer storages without 
restarting master.
  */
 @InterfaceAudience.Private
-public class ReplicationPeerManager {
+public class ReplicationPeerManager implements ConfigurationObserver {
 
-  private final ReplicationPeerStorage peerStorage;
+  private volatile ReplicationPeerStorage peerStorage;
 
   private final ReplicationQueueStorage queueStorage;
 
@@ -71,10 +75,18 @@ public class ReplicationPeerManager {
 
   private final String clusterId;
 
-  private final Configuration conf;
+  private volatile Configuration conf;
+
+  // for dynamic recreating ReplicationPeerStorage.
+  private final FileSystem fs;
+
+  private final ZKWatcher zk;
 
-  ReplicationPeerManager(ReplicationPeerStorage peerStorage, 
ReplicationQueueStorage queueStorage,
-    ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration 
conf, String clusterId) {
+  ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage 
peerStorage,
+    ReplicationQueueStorage queueStorage, ConcurrentMap<String, 
ReplicationPeerDescription> peers,
+    Configuration conf, String clusterId) {
+    this.fs = fs;
+    this.zk = zk;
     this.peerStorage = peerStorage;
     this.queueStorage = queueStorage;
     this.peers = peers;
@@ -426,7 +438,7 @@ public class ReplicationPeerManager {
       boolean enabled = peerStorage.isPeerEnabled(peerId);
       peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, 
peerConfig));
     }
-    return new ReplicationPeerManager(peerStorage,
+    return new ReplicationPeerManager(fs, zk, peerStorage,
       ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, 
conf, clusterId);
   }
 
@@ -440,4 +452,10 @@ public class ReplicationPeerManager {
     }
     return s1.equals(s2);
   }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    this.conf = conf;
+    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, 
zk, conf);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a7ee222787f..7f65af9ee5c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -34,6 +34,7 @@ import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarker
 import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
 import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.StatusCode;
 import io.opentelemetry.context.Scope;
@@ -2366,6 +2367,17 @@ public class HRegionServer extends Thread
   }
 
   private void registerConfigurationObservers() {
+    // Register Replication if possible, as now we support recreating 
replication peer storage, for
+    // migrating across different replication peer storages online
+    if (replicationSourceHandler instanceof ConfigurationObserver) {
+      configurationManager.registerObserver((ConfigurationObserver) 
replicationSourceHandler);
+    }
+    if (
+      replicationSourceHandler != replicationSinkHandler
+        && replicationSinkHandler instanceof ConfigurationObserver
+    ) {
+      configurationManager.registerObserver((ConfigurationObserver) 
replicationSinkHandler);
+    }
     // Registering the compactSplitThread object with the ConfigurationManager.
     configurationManager.registerObserver(this.compactSplitThread);
     configurationManager.registerObserver(this.rpcServices);
@@ -3821,8 +3833,9 @@ public class HRegionServer extends Thread
   }
 
   /** Returns : Returns the ConfigurationManager object for testing purposes. 
*/
-  @InterfaceAudience.Private
-  ConfigurationManager getConfigurationManager() {
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  public ConfigurationManager getConfigurationManager() {
     return configurationManager;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index c3c74c03fd6..de4f17a9311 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.conf.ConfigurationManager;
+import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
@@ -56,15 +58,19 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 
 /**
  * Gateway to Replication. Used by {@link 
org.apache.hadoop.hbase.regionserver.HRegionServer}.
+ * <p>
+ * Implement {@link PropagatingConfigurationObserver} mainly for registering
+ * {@link ReplicationPeers}, so we can recreating the replication peer storage.
  */
 @InterfaceAudience.Private
-public class Replication implements ReplicationSourceService, 
ReplicationSinkService {
+public class Replication
+  implements ReplicationSourceService, ReplicationSinkService, 
PropagatingConfigurationObserver {
   private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
   private boolean isReplicationForBulkLoadDataEnabled;
   private ReplicationSourceManager replicationManager;
   private ReplicationQueueStorage queueStorage;
   private ReplicationPeers replicationPeers;
-  private Configuration conf;
+  private volatile Configuration conf;
   private ReplicationSink replicationSink;
   // Hosting server
   private Server server;
@@ -262,4 +268,19 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
     this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
   }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void registerChildren(ConfigurationManager manager) {
+    manager.registerObserver(replicationPeers);
+  }
+
+  @Override
+  public void deregisterChildren(ConfigurationManager manager) {
+    manager.deregisterObserver(replicationPeers);
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java
new file mode 100644
index 00000000000..7b0b10f2008
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMigrateRepliationPeerStorageOnline.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestMigrateRepliationPeerStorageOnline {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMigrateRepliationPeerStorageOnline.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    // use zookeeper first, and then migrate to filesystem
+    
UTIL.getConfiguration().set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+      ReplicationPeerStorageType.ZOOKEEPER.name());
+    UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMigrate() throws Exception {
+    Admin admin = UTIL.getAdmin();
+    ReplicationPeerConfig rpc =
+      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + 
"-test")
+        
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
+    admin.addReplicationPeer("1", rpc);
+
+    // disable peer modification
+    admin.replicationPeerModificationSwitch(false, true);
+
+    // migrate replication peer data
+    Configuration conf = new Configuration(UTIL.getConfiguration());
+    assertEquals(0, ToolRunner.run(conf, new CopyReplicationPeers(conf),
+      new String[] { "zookeeper", "filesystem" }));
+    conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+      ReplicationPeerStorageType.FILESYSTEM.name());
+    // confirm that we have copied the data
+    ReplicationPeerStorage fsPeerStorage = ReplicationStorageFactory
+      .getReplicationPeerStorage(UTIL.getTestFileSystem(), 
UTIL.getZooKeeperWatcher(), conf);
+    assertNotNull(fsPeerStorage.getPeerConfig("1"));
+
+    for (MasterThread mt : UTIL.getMiniHBaseCluster().getMasterThreads()) {
+      Configuration newConf = new 
Configuration(mt.getMaster().getConfiguration());
+      newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+        ReplicationPeerStorageType.FILESYSTEM.name());
+      mt.getMaster().getConfigurationManager().notifyAllObservers(newConf);
+    }
+    for (RegionServerThread rt : 
UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+      Configuration newConf = new 
Configuration(rt.getRegionServer().getConfiguration());
+      newConf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
+        ReplicationPeerStorageType.FILESYSTEM.name());
+      
rt.getRegionServer().getConfigurationManager().notifyAllObservers(newConf);
+    }
+
+    admin.replicationPeerModificationSwitch(true);
+    admin.removeReplicationPeer("1");
+
+    // confirm that we will operation on the new peer storage
+    assertThat(fsPeerStorage.listPeerIds(), empty());
+  }
+}

Reply via email to