[hbase] 12/14: HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)

2023-04-21 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit e4f34a2aca735b707a60f5d33e5d69884ab8c204
Author: Liangjun He 
AuthorDate: Wed Apr 5 23:37:04 2023 +0800

HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)

Signed-off-by: Duo Zhang 
---
 .../regionserver/ReplicationSyncUp.java| 46 --
 .../replication/TestReplicationSyncUpTool.java | 36 +
 .../replication/TestReplicationSyncUpToolBase.java |  7 +++-
 3 files changed, 84 insertions(+), 5 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index f071cf6f1f8..cd6a4d9ac4d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
@@ -182,7 +184,7 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
 }
   }
 
-  private void writeInfoFile(FileSystem fs) throws IOException {
+  private void writeInfoFile(FileSystem fs, boolean isForce) throws 
IOException {
 // Record the info of this run. Currently only record the time we run the 
job. We will use this
 // timestamp to clean up the data for last sequence ids and hfile refs in 
replication queue
 // storage. See 
ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
@@ -190,11 +192,48 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
   new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
 String json = JsonMapper.writeObjectAsString(info);
 Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
-try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), 
false)) {
+try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), 
isForce)) {
   out.write(Bytes.toBytes(json));
 }
   }
 
+  private static boolean parseOpts(String args[]) {
+LinkedList argv = new LinkedList<>();
+argv.addAll(Arrays.asList(args));
+String cmd = null;
+while ((cmd = argv.poll()) != null) {
+  if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
+printUsageAndExit(null, 0);
+  }
+  if (cmd.equals("-f")) {
+return true;
+  }
+  if (!argv.isEmpty()) {
+printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
+  }
+}
+return false;
+  }
+
+  private static void printUsageAndExit(final String message, final int 
exitCode) {
+printUsage(message);
+System.exit(exitCode);
+  }
+
+  private static void printUsage(final String message) {
+if (message != null && message.length() > 0) {
+  System.err.println(message);
+}
+System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " 
\\");
+System.err.println("   [-D]*");
+System.err.println();
+System.err.println("General Options:");
+System.err.println(" -h|--h|--help  Show this help and exit.");
+System.err
+  .println(" -f Start a new ReplicationSyncUp after the previous 
ReplicationSyncUp failed. "
++ "See HBASE-27623 for details.");
+  }
+
   @Override
   public int run(String[] args) throws Exception {
 Abortable abortable = new Abortable() {
@@ -217,6 +256,7 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
 return abort;
   }
 };
+boolean isForce = parseOpts(args);
 Configuration conf = getConf();
 try (ZKWatcher zkw = new ZKWatcher(conf,
   "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, 
true)) {
@@ -226,7 +266,7 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
   Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
 
   System.out.println("Start Replication Server");
-  writeInfoFile(fs);
+  writeInfoFile(fs, isForce);
   Replication replication = new Replication();
   // use offline table replication queue storage
   getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java

[hbase] 12/14: HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)

2023-04-18 Thread zhangduo
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 845ad90bca0ed136b980a69b8628c9acd7091ad4
Author: Liangjun He 
AuthorDate: Wed Apr 5 23:37:04 2023 +0800

HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)

Signed-off-by: Duo Zhang 
---
 .../regionserver/ReplicationSyncUp.java| 46 --
 .../replication/TestReplicationSyncUpTool.java | 36 +
 .../replication/TestReplicationSyncUpToolBase.java |  7 +++-
 3 files changed, 84 insertions(+), 5 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index f071cf6f1f8..cd6a4d9ac4d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
@@ -182,7 +184,7 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
 }
   }
 
-  private void writeInfoFile(FileSystem fs) throws IOException {
+  private void writeInfoFile(FileSystem fs, boolean isForce) throws 
IOException {
 // Record the info of this run. Currently only record the time we run the 
job. We will use this
 // timestamp to clean up the data for last sequence ids and hfile refs in 
replication queue
 // storage. See 
ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
@@ -190,11 +192,48 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
   new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
 String json = JsonMapper.writeObjectAsString(info);
 Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
-try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), 
false)) {
+try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), 
isForce)) {
   out.write(Bytes.toBytes(json));
 }
   }
 
+  private static boolean parseOpts(String args[]) {
+LinkedList argv = new LinkedList<>();
+argv.addAll(Arrays.asList(args));
+String cmd = null;
+while ((cmd = argv.poll()) != null) {
+  if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
+printUsageAndExit(null, 0);
+  }
+  if (cmd.equals("-f")) {
+return true;
+  }
+  if (!argv.isEmpty()) {
+printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
+  }
+}
+return false;
+  }
+
+  private static void printUsageAndExit(final String message, final int 
exitCode) {
+printUsage(message);
+System.exit(exitCode);
+  }
+
+  private static void printUsage(final String message) {
+if (message != null && message.length() > 0) {
+  System.err.println(message);
+}
+System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " 
\\");
+System.err.println("   [-D]*");
+System.err.println();
+System.err.println("General Options:");
+System.err.println(" -h|--h|--help  Show this help and exit.");
+System.err
+  .println(" -f Start a new ReplicationSyncUp after the previous 
ReplicationSyncUp failed. "
++ "See HBASE-27623 for details.");
+  }
+
   @Override
   public int run(String[] args) throws Exception {
 Abortable abortable = new Abortable() {
@@ -217,6 +256,7 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
 return abort;
   }
 };
+boolean isForce = parseOpts(args);
 Configuration conf = getConf();
 try (ZKWatcher zkw = new ZKWatcher(conf,
   "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, 
true)) {
@@ -226,7 +266,7 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
   Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
 
   System.out.println("Start Replication Server");
-  writeInfoFile(fs);
+  writeInfoFile(fs, isForce);
   Replication replication = new Replication();
   // use offline table replication queue storage
   getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java