[hbase] 12/14: HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit e4f34a2aca735b707a60f5d33e5d69884ab8c204 Author: Liangjun He AuthorDate: Wed Apr 5 23:37:04 2023 +0800 HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150) Signed-off-by: Duo Zhang --- .../regionserver/ReplicationSyncUp.java| 46 -- .../replication/TestReplicationSyncUpTool.java | 36 + .../replication/TestReplicationSyncUpToolBase.java | 7 +++- 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index f071cf6f1f8..cd6a4d9ac4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -182,7 +184,7 @@ public class ReplicationSyncUp extends Configured implements Tool { } } - private void writeInfoFile(FileSystem fs) throws IOException { + private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException { // Record the info of this run. Currently only record the time we run the job. We will use this // timestamp to clean up the data for last sequence ids and hfile refs in replication queue // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore. @@ -190,11 +192,48 @@ public class ReplicationSyncUp extends Configured implements Tool { new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime()); String json = JsonMapper.writeObjectAsString(info); Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR); -try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) { +try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) { out.write(Bytes.toBytes(json)); } } + private static boolean parseOpts(String args[]) { +LinkedList argv = new LinkedList<>(); +argv.addAll(Arrays.asList(args)); +String cmd = null; +while ((cmd = argv.poll()) != null) { + if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { +printUsageAndExit(null, 0); + } + if (cmd.equals("-f")) { +return true; + } + if (!argv.isEmpty()) { +printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); + } +} +return false; + } + + private static void printUsageAndExit(final String message, final int exitCode) { +printUsage(message); +System.exit(exitCode); + } + + private static void printUsage(final String message) { +if (message != null && message.length() > 0) { + System.err.println(message); +} +System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\"); +System.err.println(" [-D]*"); +System.err.println(); +System.err.println("General Options:"); +System.err.println(" -h|--h|--help Show this help and exit."); +System.err + .println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. " ++ "See HBASE-27623 for details."); + } + @Override public int run(String[] args) throws Exception { Abortable abortable = new Abortable() { @@ -217,6 +256,7 @@ public class ReplicationSyncUp extends Configured implements Tool { return abort; } }; +boolean isForce = parseOpts(args); Configuration conf = getConf(); try (ZKWatcher zkw = new ZKWatcher(conf, "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) { @@ -226,7 +266,7 @@ public class ReplicationSyncUp extends Configured implements Tool { Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); System.out.println("Start Replication Server"); - writeInfoFile(fs); + writeInfoFile(fs, isForce); Replication replication = new Replication(); // use offline table replication queue storage getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
[hbase] 12/14: HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)
This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git commit 845ad90bca0ed136b980a69b8628c9acd7091ad4 Author: Liangjun He AuthorDate: Wed Apr 5 23:37:04 2023 +0800 HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150) Signed-off-by: Duo Zhang --- .../regionserver/ReplicationSyncUp.java| 46 -- .../replication/TestReplicationSyncUpTool.java | 36 + .../replication/TestReplicationSyncUpToolBase.java | 7 +++- 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index f071cf6f1f8..cd6a4d9ac4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -182,7 +184,7 @@ public class ReplicationSyncUp extends Configured implements Tool { } } - private void writeInfoFile(FileSystem fs) throws IOException { + private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException { // Record the info of this run. Currently only record the time we run the job. We will use this // timestamp to clean up the data for last sequence ids and hfile refs in replication queue // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore. @@ -190,11 +192,48 @@ public class ReplicationSyncUp extends Configured implements Tool { new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime()); String json = JsonMapper.writeObjectAsString(info); Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR); -try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) { +try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) { out.write(Bytes.toBytes(json)); } } + private static boolean parseOpts(String args[]) { +LinkedList argv = new LinkedList<>(); +argv.addAll(Arrays.asList(args)); +String cmd = null; +while ((cmd = argv.poll()) != null) { + if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { +printUsageAndExit(null, 0); + } + if (cmd.equals("-f")) { +return true; + } + if (!argv.isEmpty()) { +printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); + } +} +return false; + } + + private static void printUsageAndExit(final String message, final int exitCode) { +printUsage(message); +System.exit(exitCode); + } + + private static void printUsage(final String message) { +if (message != null && message.length() > 0) { + System.err.println(message); +} +System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\"); +System.err.println(" [-D]*"); +System.err.println(); +System.err.println("General Options:"); +System.err.println(" -h|--h|--help Show this help and exit."); +System.err + .println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. " ++ "See HBASE-27623 for details."); + } + @Override public int run(String[] args) throws Exception { Abortable abortable = new Abortable() { @@ -217,6 +256,7 @@ public class ReplicationSyncUp extends Configured implements Tool { return abort; } }; +boolean isForce = parseOpts(args); Configuration conf = getConf(); try (ZKWatcher zkw = new ZKWatcher(conf, "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) { @@ -226,7 +266,7 @@ public class ReplicationSyncUp extends Configured implements Tool { Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); System.out.println("Start Replication Server"); - writeInfoFile(fs); + writeInfoFile(fs, isForce); Replication replication = new Replication(); // use offline table replication queue storage getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java