This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 13d2405edef HBASE-26867 Introduce a FlushProcedure (#5256)
13d2405edef is described below

commit 13d2405edef1912a4a83e635ea66449b84f8f9a4
Author: Ruanhui <32773751+frostr...@users.noreply.github.com>
AuthorDate: Mon Aug 14 21:52:03 2023 +0800

    HBASE-26867 Introduce a FlushProcedure (#5256)
    
    Co-authored-by: huiruan <876107...@qq.com>
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    (cherry picked from commit 20c9e4ba5f66dd959e7c62a1a8090164a329f571)
---
 .../java/org/apache/hadoop/hbase/client/Admin.java |   9 +
 .../hadoop/hbase/client/AdminOverAsyncAdmin.java   |   5 +
 .../org/apache/hadoop/hbase/client/AsyncAdmin.java |   8 +
 .../hadoop/hbase/client/AsyncHBaseAdmin.java       |   5 +
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |  62 +++++-
 .../hbase/shaded/protobuf/RequestConverter.java    |  13 ++
 .../java/org/apache/hadoop/hbase/util/Strings.java |   6 +
 .../src/main/protobuf/server/master/Master.proto   |  14 ++
 .../protobuf/server/master/MasterProcedure.proto   |  20 ++
 .../apache/hadoop/hbase/executor/EventType.java    |   8 +-
 .../apache/hadoop/hbase/executor/ExecutorType.java |   4 +-
 .../org/apache/hadoop/hbase/master/HMaster.java    |  31 +++
 .../hadoop/hbase/master/MasterRpcServices.java     |  19 ++
 .../apache/hadoop/hbase/master/MasterServices.java |  11 +
 .../master/procedure/FlushRegionProcedure.java     | 238 +++++++++++++++++++++
 .../master/procedure/FlushTableProcedure.java      | 199 +++++++++++++++++
 .../master/procedure/TableProcedureInterface.java  |   1 +
 .../hadoop/hbase/master/procedure/TableQueue.java  |   1 +
 .../procedure/flush/FlushTableSubprocedure.java    |  21 +-
 .../flush/MasterFlushTableProcedureManager.java    |  10 +-
 .../RegionServerFlushTableProcedureManager.java    |  18 +-
 .../hbase/regionserver/FlushRegionCallable.java    |  83 +++++++
 .../hadoop/hbase/regionserver/HRegionServer.java   |   4 +
 .../hbase/master/MockNoopMasterServices.java       |   6 +
 .../master/procedure/TestFlushTableProcedure.java  |  69 ++++++
 .../procedure/TestFlushTableProcedureBase.java     |  97 +++++++++
 .../TestFlushTableProcedureMasterRestarts.java     |  76 +++++++
 ...eProcedureWithDoNotSupportFlushTableMaster.java |  82 +++++++
 .../TestFlushWithThroughputController.java         |  10 +-
 .../hbase/rsgroup/VerifyingRSGroupAdmin.java       |   4 +
 .../hadoop/hbase/thrift2/client/ThriftAdmin.java   |   5 +
 31 files changed, 1111 insertions(+), 28 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 547c6f4024a..4d579c16af2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -550,6 +550,15 @@ public interface Admin extends Abortable, Closeable {
    */
   void flush(TableName tableName, byte[] columnFamily) throws IOException;
 
+  /**
+   * Flush the specified column family stores on all regions of the passed 
table. This runs as a
+   * synchronous operation.
+   * @param tableName      table to flush
+   * @param columnFamilies column families within a table
+   * @throws IOException if a remote or network exception occurs
+   */
+  void flush(TableName tableName, List<byte[]> columnFamilies) throws 
IOException;
+
   /**
    * Flush an individual region. Synchronous operation.
    * @param regionName region to flush
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
index 48ce3dd9b01..690b6406fd3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java
@@ -264,6 +264,11 @@ class AdminOverAsyncAdmin implements Admin {
     get(admin.flush(tableName, columnFamily));
   }
 
+  @Override
+  public void flush(TableName tableName, List<byte[]> columnFamilies) throws 
IOException {
+    get(admin.flush(tableName, columnFamilies));
+  }
+
   @Override
   public void flushRegion(byte[] regionName) throws IOException {
     get(admin.flushRegion(regionName));
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 473773e65ce..960982f5e3f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -347,6 +347,14 @@ public interface AsyncAdmin {
    */
   CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily);
 
+  /**
+   * Flush the specified column family stores on all regions of the passed 
table. This runs as a
+   * synchronous operation.
+   * @param tableName      table to flush
+   * @param columnFamilies column families within a table
+   */
+  CompletableFuture<Void> flush(TableName tableName, List<byte[]> 
columnFamilies);
+
   /**
    * Flush an individual region.
    * @param regionName region to flush
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 0fe99afbba8..5ee8a6ab826 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -269,6 +269,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
     return wrap(rawAdmin.flush(tableName, columnFamily));
   }
 
+  @Override
+  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> 
columnFamilies) {
+    return wrap(rawAdmin.flush(tableName, columnFamilies));
+  }
+
   @Override
   public CompletableFuture<Void> flushRegion(byte[] regionName) {
     return wrap(rawAdmin.flushRegion(regionName));
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 9b3baec87c7..ee1dfac16bd 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ClientMetaTableAccessor;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.ClusterMetrics.Option;
 import org.apache.hadoop.hbase.ClusterMetricsBuilder;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Strings;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -180,6 +182,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
@@ -950,12 +954,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> flush(TableName tableName) {
-    return flush(tableName, null);
+    return flush(tableName, Collections.emptyList());
   }
 
   @Override
   public CompletableFuture<Void> flush(TableName tableName, byte[] 
columnFamily) {
+    return flush(tableName, Collections.singletonList(columnFamily));
+  }
+
+  @Override
+  public CompletableFuture<Void> flush(TableName tableName, List<byte[]> 
columnFamilyList) {
+    // This is for keeping compatibility with old implementation.
+    // If the server version is lower than the client version, it's possible 
that the
+    // flushTable method is not present in the server side, if so, we need to 
fall back
+    // to the old implementation.
+    List<byte[]> columnFamilies = columnFamilyList.stream()
+      .filter(cf -> cf != null && cf.length > 
0).distinct().collect(Collectors.toList());
+    FlushTableRequest request = 
RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
+      ng.getNonceGroup(), ng.newNonce());
+    CompletableFuture<Void> procFuture = this.<FlushTableRequest, 
FlushTableResponse> procedureCall(
+      tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
+      (resp) -> resp.getProcId(), new 
FlushTableProcedureBiConsumer(tableName));
     CompletableFuture<Void> future = new CompletableFuture<>();
+    addListener(procFuture, (ret, error) -> {
+      if (error != null) {
+        if (error instanceof TableNotFoundException || error instanceof 
TableNotEnabledException) {
+          future.completeExceptionally(error);
+        } else if (error instanceof DoNotRetryIOException) {
+          // usually this is caused by the method is not present on the server 
or
+          // the hbase hadoop version does not match the running hadoop 
version.
+          // if that happens, we need fall back to the old flush 
implementation.
+          LOG.info("Unrecoverable error in master side. Fallback to 
FlushTableProcedure V1", error);
+          legacyFlush(future, tableName, columnFamilies);
+        } else {
+          future.completeExceptionally(error);
+        }
+      } else {
+        future.complete(ret);
+      }
+    });
+    return future;
+  }
+
+  private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
+    List<byte[]> columnFamilies) {
     addListener(tableExists(tableName), (exists, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
@@ -969,8 +1011,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
             future.completeExceptionally(new 
TableNotEnabledException(tableName));
           } else {
             Map<String, String> props = new HashMap<>();
-            if (columnFamily != null) {
-              props.put(HConstants.FAMILY_KEY_STR, 
Bytes.toString(columnFamily));
+            if (columnFamilies != null && !columnFamilies.isEmpty()) {
+              props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
+                
.join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
             }
             addListener(
               execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, 
tableName.getNameAsString(), props),
@@ -985,7 +1028,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         });
       }
     });
-    return future;
   }
 
   @Override
@@ -2768,6 +2810,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
     }
   }
 
+  private static class FlushTableProcedureBiConsumer extends 
TableProcedureBiConsumer {
+
+    FlushTableProcedureBiConsumer(TableName tableName) {
+      super(tableName);
+    }
+
+    @Override
+    String getOperationType() {
+      return "FLUSH";
+    }
+  }
+
   private static class CreateNamespaceProcedureBiConsumer extends 
NamespaceProcedureBiConsumer {
 
     CreateNamespaceProcedureBiConsumer(String namespaceName) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 9c88b61fd67..33884158da4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -116,6 +116,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
@@ -1714,4 +1715,16 @@ public final class RequestConverter {
     }
     return RemoveServersRequest.newBuilder().addAllServers(hostPorts).build();
   }
+
+  public static FlushTableRequest buildFlushTableRequest(final TableName 
tableName,
+    final List<byte[]> columnFamilies, final long nonceGroup, final long 
nonce) {
+    FlushTableRequest.Builder builder = FlushTableRequest.newBuilder();
+    builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
+    if (!columnFamilies.isEmpty()) {
+      for (byte[] columnFamily : columnFamilies) {
+        builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
+      }
+    }
+    return builder.setNonceGroup(nonceGroup).setNonce(nonce).build();
+  }
 }
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java
index cdf5bf63fb5..3baab9cca21 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.util;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+
 /**
  * Utility for Strings.
  */
@@ -28,6 +31,9 @@ public final class Strings {
   public static final String DEFAULT_SEPARATOR = "=";
   public static final String DEFAULT_KEYVALUE_SEPARATOR = ", ";
 
+  public static final Joiner JOINER = Joiner.on(",");
+  public static final Splitter SPLITTER = Splitter.on(",");
+
   private Strings() {
   }
 
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index 6f512a92152..5d715fdcdd1 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -200,6 +200,17 @@ message ModifyTableResponse {
   optional uint64 proc_id = 1;
 }
 
+message FlushTableRequest {
+  required TableName table_name = 1;
+  repeated bytes column_family = 2;
+  optional uint64 nonce_group = 3 [default = 0];
+  optional uint64 nonce = 4 [default = 0];
+}
+
+message FlushTableResponse {
+  optional uint64 proc_id = 1;
+}
+
 /* Namespace-level protobufs */
 
 message CreateNamespaceRequest {
@@ -1239,6 +1250,9 @@ service MasterService {
 
   rpc FlushMasterStore(FlushMasterStoreRequest)
     returns(FlushMasterStoreResponse);
+
+  rpc FlushTable(FlushTableRequest)
+    returns(FlushTableResponse);
 }
 
 // HBCK Service definitions.
diff --git 
a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto 
b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
index 901abf6bd0c..3f3ecd63b00 100644
--- 
a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
+++ 
b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
@@ -191,6 +191,26 @@ message RestoreParentToChildRegionsPair {
   required string child2_region_name = 3;
 }
 
+enum FlushTableState {
+  FLUSH_TABLE_PREPARE = 1;
+  FLUSH_TABLE_FLUSH_REGIONS = 2;
+}
+
+message FlushTableProcedureStateData {
+  required TableName table_name = 1;
+  repeated bytes column_family = 2;
+}
+
+message FlushRegionProcedureStateData {
+  required RegionInfo region = 1;
+  repeated bytes column_family = 2;
+}
+
+message FlushRegionParameter {
+  required RegionInfo region = 1;
+  repeated bytes column_family = 2;
+}
+
 enum SnapshotState {
   SNAPSHOT_PREPARE = 1;
   SNAPSHOT_PRE_OPERATION = 2;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index e79c9c2bc41..07f8339a20d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -291,7 +291,13 @@ public enum EventType {
    * RS verify snapshot.<br>
    * RS_VERIFY_SNAPSHOT
    */
-  RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS);
+  RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS),
+
+  /**
+   * RS flush regions.<br>
+   * RS_FLUSH_OPERATIONS
+   */
+  RS_FLUSH_REGIONS(89, ExecutorType.RS_FLUSH_OPERATIONS);
 
   private final int code;
   private final ExecutorType executor;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index 1af70b2c44b..b16a2dd4f95 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -54,7 +54,9 @@ public enum ExecutorType {
   RS_SWITCH_RPC_THROTTLE(33),
   RS_IN_MEMORY_COMPACTION(34),
   RS_CLAIM_REPLICATION_QUEUE(35),
-  RS_SNAPSHOT_OPERATIONS(36);
+  RS_SNAPSHOT_OPERATIONS(36),
+
+  RS_FLUSH_OPERATIONS(37);
 
   ExecutorType(int value) {
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 1b529149150..995bff17724 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -157,6 +157,7 @@ import 
org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
+import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -4381,4 +4382,34 @@ public class HMaster extends 
HBaseServerBase<MasterRpcServices> implements Maste
     // initialize master side coprocessors before we start handling requests
     this.cpHost = new MasterCoprocessorHost(this, conf);
   }
+
+  @Override
+  public long flushTable(TableName tableName, List<byte[]> columnFamilies, 
long nonceGroup,
+    long nonce) throws IOException {
+    checkInitialized();
+
+    if (
+      
!getConfiguration().getBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED,
+        MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)
+    ) {
+      throw new DoNotRetryIOException("FlushTableProcedureV2 is DISABLED");
+    }
+
+    return MasterProcedureUtil
+      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, 
nonceGroup, nonce) {
+        @Override
+        protected void run() throws IOException {
+          getMaster().getMasterCoprocessorHost().preTableFlush(tableName);
+          LOG.info(getClientIdAuditPrefix() + " flush " + tableName);
+          submitProcedure(
+            new FlushTableProcedure(procedureExecutor.getEnvironment(), 
tableName, columnFamilies));
+          getMaster().getMasterCoprocessorHost().postTableFlush(tableName);
+        }
+
+        @Override
+        protected String getDescription() {
+          return "FlushTableProcedure";
+        }
+      });
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 2e416f5e1a0..b6a17d8503b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -235,6 +235,8 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaReq
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
@@ -3590,4 +3592,21 @@ public class MasterRpcServices extends 
HBaseRpcServicesBase<HMaster>
     }
     return FlushMasterStoreResponse.newBuilder().build();
   }
+
+  @Override
+  public FlushTableResponse flushTable(RpcController controller, 
FlushTableRequest req)
+    throws ServiceException {
+    TableName tableName = ProtobufUtil.toTableName(req.getTableName());
+    List<byte[]> columnFamilies = req.getColumnFamilyCount() > 0
+      ? req.getColumnFamilyList().stream().filter(cf -> 
!cf.isEmpty()).map(ByteString::toByteArray)
+        .collect(Collectors.toList())
+      : null;
+    try {
+      long procId =
+        server.flushTable(tableName, columnFamilies, req.getNonceGroup(), 
req.getNonce());
+      return FlushTableResponse.newBuilder().setProcId(procId).build();
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 95166240c78..933bf0d1815 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -477,4 +477,15 @@ public interface MasterServices extends Server {
    * Flush master local region
    */
   void flushMasterStore() throws IOException;
+
+  /**
+   * Flush an existing table
+   * @param tableName      The table name
+   * @param columnFamilies The column families to flush
+   * @param nonceGroup     the nonce group
+   * @param nonce          the nonce
+   * @return the flush procedure id
+   */
+  long flushTable(final TableName tableName, final List<byte[]> columnFamilies,
+    final long nonceGroup, final long nonce) throws IOException;
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java
new file mode 100644
index 00000000000..67f0442b618
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
+import org.apache.hadoop.hbase.master.assignment.ServerState;
+import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.regionserver.FlushRegionCallable;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionProcedureStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+@InterfaceAudience.Private
+public class FlushRegionProcedure extends Procedure<MasterProcedureEnv>
+  implements TableProcedureInterface, RemoteProcedure<MasterProcedureEnv, 
ServerName> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlushRegionProcedure.class);
+
+  private RegionInfo region;
+  private List<byte[]> columnFamilies;
+  private ProcedureEvent<?> event;
+  private boolean dispatched;
+  private boolean succ;
+  private RetryCounter retryCounter;
+
+  public FlushRegionProcedure() {
+  }
+
+  public FlushRegionProcedure(RegionInfo region) {
+    this(region, null);
+  }
+
+  public FlushRegionProcedure(RegionInfo region, List<byte[]> columnFamilies) {
+    this.region = region;
+    this.columnFamilies = columnFamilies;
+  }
+
+  @Override
+  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+    throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
+    if (dispatched) {
+      if (succ) {
+        return null;
+      }
+      dispatched = false;
+    }
+
+    RegionStates regionStates = env.getAssignmentManager().getRegionStates();
+    RegionStateNode regionNode = regionStates.getRegionStateNode(region);
+    regionNode.lock();
+    try {
+      if (!regionNode.isInState(State.OPEN) || regionNode.isInTransition()) {
+        LOG.info("State of region {} is not OPEN or in transition. Skip {} 
...", region, this);
+        return null;
+      }
+      ServerName targetServer = regionNode.getRegionLocation();
+      if (targetServer == null) {
+        setTimeoutForSuspend(env,
+          String.format("target server of region %s is null", 
region.getRegionNameAsString()));
+        throw new ProcedureSuspendedException();
+      }
+      ServerState serverState = 
regionStates.getServerNode(targetServer).getState();
+      if (serverState != ServerState.ONLINE) {
+        setTimeoutForSuspend(env, String.format("target server of region %s %s 
is in state %s",
+          region.getRegionNameAsString(), targetServer, serverState));
+        throw new ProcedureSuspendedException();
+      }
+      try {
+        env.getRemoteDispatcher().addOperationToNode(targetServer, this);
+        dispatched = true;
+        event = new ProcedureEvent<>(this);
+        event.suspendIfNotReady(this);
+        throw new ProcedureSuspendedException();
+      } catch (FailedRemoteDispatchException e) {
+        setTimeoutForSuspend(env, "Failed send request to " + targetServer);
+        throw new ProcedureSuspendedException();
+      }
+    } finally {
+      regionNode.unlock();
+    }
+  }
+
+  @Override
+  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+    setState(ProcedureProtos.ProcedureState.RUNNABLE);
+    env.getProcedureScheduler().addFront(this);
+    return false;
+  }
+
+  @Override
+  protected void rollback(MasterProcedureEnv env) throws IOException, 
InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected boolean abort(MasterProcedureEnv env) {
+    return false;
+  }
+
+  @Override
+  public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, 
IOException e) {
+    complete(env, e);
+  }
+
+  @Override
+  public void remoteOperationCompleted(MasterProcedureEnv env) {
+    complete(env, null);
+  }
+
+  @Override
+  public void remoteOperationFailed(MasterProcedureEnv env, 
RemoteProcedureException error) {
+    complete(env, error);
+  }
+
+  private void complete(MasterProcedureEnv env, Throwable error) {
+    if (isFinished()) {
+      LOG.info("This procedure {} is already finished, skip the rest 
processes", this.getProcId());
+      return;
+    }
+    if (event == null) {
+      LOG.warn("procedure event for {} is null, maybe the procedure is created 
when recovery",
+        getProcId());
+      return;
+    }
+    if (error == null) {
+      succ = true;
+    }
+    event.wake(env.getProcedureScheduler());
+    event = null;
+  }
+
+  private void setTimeoutForSuspend(MasterProcedureEnv env, String reason) {
+    if (retryCounter == null) {
+      retryCounter = 
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+    }
+    long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+    LOG.warn("{} can not run currently because {}, wait {} ms to retry", this, 
reason, backoff);
+    setTimeout(Math.toIntExact(backoff));
+    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+    skipPersistence();
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    FlushRegionProcedureStateData.Builder builder = 
FlushRegionProcedureStateData.newBuilder();
+    builder.setRegion(ProtobufUtil.toRegionInfo(region));
+    if (columnFamilies != null) {
+      for (byte[] columnFamily : columnFamilies) {
+        if (columnFamily != null && columnFamily.length > 0) {
+          
builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
+        }
+      }
+    }
+    serializer.serialize(builder.build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    FlushRegionProcedureStateData data =
+      serializer.deserialize(FlushRegionProcedureStateData.class);
+    this.region = ProtobufUtil.toRegionInfo(data.getRegion());
+    if (data.getColumnFamilyCount() > 0) {
+      this.columnFamilies = data.getColumnFamilyList().stream().filter(cf -> 
!cf.isEmpty())
+        .map(ByteString::toByteArray).collect(Collectors.toList());
+    }
+  }
+
+  @Override
+  public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, 
ServerName serverName) {
+    FlushRegionParameter.Builder builder = FlushRegionParameter.newBuilder();
+    builder.setRegion(ProtobufUtil.toRegionInfo(region));
+    if (columnFamilies != null) {
+      for (byte[] columnFamily : columnFamilies) {
+        if (columnFamily != null && columnFamily.length > 0) {
+          
builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
+        }
+      }
+    }
+    return Optional.of(new RSProcedureDispatcher.ServerOperation(this, 
getProcId(),
+      FlushRegionCallable.class, builder.build().toByteArray()));
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.FLUSH;
+  }
+
+  @Override
+  protected boolean waitInitialized(MasterProcedureEnv env) {
+    return env.waitInitialized(this);
+  }
+
+  @Override
+  public TableName getTableName() {
+    return region.getTable();
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java
new file mode 100644
index 00000000000..892d4d13b5e
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import 
org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableProcedureStateData;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableState;
+
+@InterfaceAudience.Private
+public class FlushTableProcedure extends 
AbstractStateMachineTableProcedure<FlushTableState> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlushTableProcedure.class);
+
+  private TableName tableName;
+
+  private List<byte[]> columnFamilies;
+
+  public FlushTableProcedure() {
+    super();
+  }
+
+  public FlushTableProcedure(MasterProcedureEnv env, TableName tableName) {
+    this(env, tableName, null);
+  }
+
+  public FlushTableProcedure(MasterProcedureEnv env, TableName tableName,
+    List<byte[]> columnFamilies) {
+    super(env);
+    this.tableName = tableName;
+    this.columnFamilies = columnFamilies;
+  }
+
+  @Override
+  protected LockState acquireLock(MasterProcedureEnv env) {
+    // Here we don't acquire table lock because the flush operation and other 
operations (like
+    // split or merge) are not mutually exclusive. Region will flush memstore 
when being closed.
+    // It's safe even if we don't have lock. However, currently we are limited 
by the scheduling
+    // mechanism of the procedure scheduler and have to acquire table shared 
lock here. See
+    // HBASE-27905 for details.
+    if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) 
{
+      return LockState.LOCK_EVENT_WAIT;
+    }
+    return LockState.LOCK_ACQUIRED;
+  }
+
+  @Override
+  protected void releaseLock(MasterProcedureEnv env) {
+    env.getProcedureScheduler().wakeTableSharedLock(this, getTableName());
+  }
+
+  @Override
+  protected Flow executeFromState(MasterProcedureEnv env, FlushTableState 
state)
+    throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
+    LOG.info("{} execute state={}", this, state);
+
+    try {
+      switch (state) {
+        case FLUSH_TABLE_PREPARE:
+          preflightChecks(env, true);
+          setNextState(FlushTableState.FLUSH_TABLE_FLUSH_REGIONS);
+          return Flow.HAS_MORE_STATE;
+        case FLUSH_TABLE_FLUSH_REGIONS:
+          addChildProcedure(createFlushRegionProcedures(env));
+          return Flow.NO_MORE_STATE;
+        default:
+          throw new UnsupportedOperationException("unhandled state=" + state);
+      }
+    } catch (Exception e) {
+      if (e instanceof DoNotRetryIOException) {
+        // for example, TableNotFoundException or TableNotEnabledException
+        setFailure("master-flush-table", e);
+        LOG.warn("Unrecoverable error trying to flush " + getTableName() + " 
state=" + state, e);
+      } else {
+        LOG.warn("Retriable error trying to flush " + getTableName() + " 
state=" + state, e);
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(MasterProcedureEnv env, FlushTableState state)
+    throws IOException, InterruptedException {
+    // nothing to rollback
+  }
+
+  @Override
+  protected FlushTableState getState(int stateId) {
+    return FlushTableState.forNumber(stateId);
+  }
+
+  @Override
+  protected int getStateId(FlushTableState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected FlushTableState getInitialState() {
+    return FlushTableState.FLUSH_TABLE_PREPARE;
+  }
+
+  @Override
+  public TableName getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.FLUSH;
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.serializeStateData(serializer);
+    FlushTableProcedureStateData.Builder builder = 
FlushTableProcedureStateData.newBuilder();
+    builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
+    if (columnFamilies != null) {
+      for (byte[] columnFamily : columnFamilies) {
+        if (columnFamily != null && columnFamily.length > 0) {
+          
builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
+        }
+      }
+    }
+    serializer.serialize(builder.build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.deserializeStateData(serializer);
+    FlushTableProcedureStateData data = 
serializer.deserialize(FlushTableProcedureStateData.class);
+    this.tableName = ProtobufUtil.toTableName(data.getTableName());
+    if (data.getColumnFamilyCount() > 0) {
+      this.columnFamilies = data.getColumnFamilyList().stream().filter(cf -> 
!cf.isEmpty())
+        .map(ByteString::toByteArray).collect(Collectors.toList());
+    }
+  }
+
+  private FlushRegionProcedure[] 
createFlushRegionProcedures(MasterProcedureEnv env) {
+    return env.getAssignmentManager().getTableRegions(getTableName(), 
true).stream()
+      .filter(r -> RegionReplicaUtil.isDefaultReplica(r))
+      .map(r -> new FlushRegionProcedure(r, 
columnFamilies)).toArray(FlushRegionProcedure[]::new);
+  }
+
+  @Override
+  public void toStringClassDetails(StringBuilder builder) {
+    builder.append(getClass().getName()).append(", 
id=").append(getProcId()).append(", table=")
+      .append(tableName);
+    if (columnFamilies != null) {
+      builder.append(", columnFamilies=[")
+        .append(Strings.JOINER
+          
.join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())))
+        .append("]");
+    }
+  }
+
+  @Override
+  protected void afterReplay(MasterProcedureEnv env) {
+    if (
+      !env.getMasterConfiguration().getBoolean(
+        MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED,
+        MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)
+    ) {
+      setFailure("master-flush-table", new 
HBaseIOException("FlushTableProcedureV2 is DISABLED"));
+    }
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
index c689e52302f..1ca5b17ac21 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
@@ -42,6 +42,7 @@ public interface TableProcedureInterface {
     READ,
     SNAPSHOT,
     REGION_SNAPSHOT,
+    FLUSH,
     REGION_EDIT,
     REGION_SPLIT,
     REGION_MERGE,
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
index 1a9847edcc8..d1acd08ea21 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
@@ -56,6 +56,7 @@ class TableQueue extends Queue<TableName> {
         // we allow concurrent edit on the ns family in meta table
         return 
!proc.getTableName().equals(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
       case READ:
+      case FLUSH:
       case SNAPSHOT:
         return false;
       // region operations are using the shared-lock on the table
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
index b521a85e7a0..88ddf6102a5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.hbase.procedure.flush;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.procedure.ProcedureMember;
@@ -41,16 +41,16 @@ public class FlushTableSubprocedure extends Subprocedure {
   private static final Logger LOG = 
LoggerFactory.getLogger(FlushTableSubprocedure.class);
 
   private final String table;
-  private final String family;
+  private final List<String> families;
   private final List<HRegion> regions;
   private final FlushTableSubprocedurePool taskManager;
 
   public FlushTableSubprocedure(ProcedureMember member, 
ForeignExceptionDispatcher errorListener,
-    long wakeFrequency, long timeout, List<HRegion> regions, String table, 
String family,
+    long wakeFrequency, long timeout, List<HRegion> regions, String table, 
List<String> families,
     FlushTableSubprocedurePool taskManager) {
     super(member, table, errorListener, wakeFrequency, timeout);
     this.table = table;
-    this.family = family;
+    this.families = families;
     this.regions = regions;
     this.taskManager = taskManager;
   }
@@ -70,7 +70,7 @@ public class FlushTableSubprocedure extends Subprocedure {
       region.startRegionOperation();
       try {
         LOG.debug("Flush region " + region.toString() + " started...");
-        if (families == null) {
+        if (families == null || families.isEmpty()) {
           region.flush(true);
         } else {
           region.flushcache(families, false, FlushLifeCycleTracker.DUMMY);
@@ -97,15 +97,16 @@ public class FlushTableSubprocedure extends Subprocedure {
       throw new IllegalStateException(
         "Attempting to flush " + table + " but we currently have outstanding 
tasks");
     }
-    List<byte[]> families = null;
-    if (family != null) {
-      LOG.debug("About to flush family {} on all regions for table {}", 
family, table);
-      families = Collections.singletonList(Bytes.toBytes(family));
+
+    List<byte[]> familiesToFlush = null;
+    if (families != null && !families.isEmpty()) {
+      LOG.debug("About to flush family {} on all regions for table {}", 
families, table);
+      familiesToFlush = 
families.stream().map(Bytes::toBytes).collect(Collectors.toList());
     }
     // Add all hfiles already existing in region.
     for (HRegion region : regions) {
       // submit one task per region for parallelize by region.
-      taskManager.submitTask(new RegionFlushTask(region, families));
+      taskManager.submitTask(new RegionFlushTask(region, familiesToFlush));
       monitor.rethrowException();
     }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
index 15d3d8a73a9..529d71ba728 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java
@@ -58,6 +58,10 @@ public class MasterFlushTableProcedureManager extends 
MasterProcedureManager {
 
   public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = 
"flush-table-proc";
 
+  public static final String FLUSH_PROCEDURE_ENABLED = 
"hbase.flush.procedure.enabled";
+
+  public static final boolean FLUSH_PROCEDURE_ENABLED_DEFAULT = true;
+
   private static final String FLUSH_TIMEOUT_MILLIS_KEY = 
"hbase.flush.master.timeoutMillis";
   private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
   private static final String FLUSH_WAKE_MILLIS_KEY = 
"hbase.flush.master.wakeMillis";
@@ -142,13 +146,13 @@ public class MasterFlushTableProcedureManager extends 
MasterProcedureManager {
 
     ForeignExceptionDispatcher monitor = new 
ForeignExceptionDispatcher(desc.getInstance());
 
-    HBaseProtos.NameStringPair family = null;
+    HBaseProtos.NameStringPair families = null;
     for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) {
       if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) {
-        family = nsp;
+        families = nsp;
       }
     }
-    byte[] procArgs = family != null ? family.toByteArray() : new byte[0];
+    byte[] procArgs = families != null ? families.toByteArray() : new byte[0];
 
     // Kick of the global procedure from the master coordinator to the region 
servers.
     // We rely on the existing Distributed Procedure framework to prevent any 
concurrent
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index 3322f7a5cd5..2cee89b5749 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -43,6 +43,7 @@ import 
org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Strings;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -50,6 +51,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@@ -129,7 +131,7 @@ public class RegionServerFlushTableProcedureManager extends 
RegionServerProcedur
    * of a race where regions may be missed.
    * @return Subprocedure to submit to the ProcedureMember.
    */
-  public Subprocedure buildSubprocedure(String table, String family) {
+  public Subprocedure buildSubprocedure(String table, List<String> families) {
 
     // don't run the subprocedure if the parent is stop(ping)
     if (rss.isStopping() || rss.isStopped()) {
@@ -159,7 +161,7 @@ public class RegionServerFlushTableProcedureManager extends 
RegionServerProcedur
     FlushTableSubprocedurePool taskManager =
       new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, 
rss);
     return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, 
timeoutMillis,
-      involvedRegions, table, family, taskManager);
+      involvedRegions, table, families, taskManager);
   }
 
   /**
@@ -175,19 +177,19 @@ public class RegionServerFlushTableProcedureManager 
extends RegionServerProcedur
 
     @Override
     public Subprocedure buildSubprocedure(String name, byte[] data) {
-      String family = null;
-      // Currently we do not put other data except family, so it is ok to
-      // judge by length that if family was specified
+      List<String> families = null;
+      // Currently we do not put other data except families, so it is ok to
+      // judge by length that if families were specified
       if (data.length > 0) {
         try {
           HBaseProtos.NameStringPair nsp = 
HBaseProtos.NameStringPair.parseFrom(data);
-          family = nsp.getValue();
+          families = 
ImmutableList.copyOf(Strings.SPLITTER.split(nsp.getValue()));
         } catch (Exception e) {
-          LOG.error("fail to get family by parsing from data", e);
+          LOG.error("fail to get families by parsing from data", e);
         }
       }
       // The name of the procedure instance from the master is the table name.
-      return 
RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family);
+      return 
RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, families);
     }
 
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java
new file mode 100644
index 00000000000..3dd932a1736
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter;
+
+@InterfaceAudience.Private
+public class FlushRegionCallable extends BaseRSProcedureCallable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlushRegionCallable.class);
+
+  private RegionInfo regionInfo;
+
+  private List<byte[]> columnFamilies;
+
+  @Override
+  protected void doCall() throws Exception {
+    HRegion region = rs.getRegion(regionInfo.getEncodedName());
+    if (region == null) {
+      throw new NotServingRegionException("region=" + 
regionInfo.getRegionNameAsString());
+    }
+    LOG.debug("Starting region operation on {}", region);
+    region.startRegionOperation();
+    try {
+      HRegion.FlushResult res;
+      if (columnFamilies == null) {
+        res = region.flush(true);
+      } else {
+        res = region.flushcache(columnFamilies, false, 
FlushLifeCycleTracker.DUMMY);
+      }
+      if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) {
+        throw new IOException("Unable to complete flush " + regionInfo);
+      }
+    } finally {
+      LOG.debug("Closing region operation on {}", region);
+      region.closeRegionOperation();
+    }
+  }
+
+  @Override
+  protected void initParameter(byte[] parameter) throws Exception {
+    FlushRegionParameter param = FlushRegionParameter.parseFrom(parameter);
+    this.regionInfo = ProtobufUtil.toRegionInfo(param.getRegion());
+    if (param.getColumnFamilyCount() > 0) {
+      this.columnFamilies = param.getColumnFamilyList().stream().filter(cf -> 
!cf.isEmpty())
+        .map(ByteString::toByteArray).collect(Collectors.toList());
+    }
+  }
+
+  @Override
+  public EventType getEventType() {
+    return EventType.RS_FLUSH_REGIONS;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f9f84118106..07d2ac332c5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1906,6 +1906,10 @@ public class HRegionServer extends 
HBaseServerBase<RSRpcServices>
     executorService.startExecutorService(
       executorService.new 
ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
         .setCorePoolSize(rsSnapshotOperationThreads));
+    final int rsFlushOperationThreads =
+      conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3);
+    executorService.startExecutorService(executorService.new ExecutorConfig()
+      
.setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads));
 
     Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
       uncaughtExceptionHandler);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index c82220a8b22..a19b6ffbec6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -536,4 +536,10 @@ public class MockNoopMasterServices implements 
MasterServices {
   public Semaphore getSyncReplicationPeerLock() {
     return null;
   }
+
+  @Override
+  public long flushTable(TableName tableName, List<byte[]> columnFamilies, 
long nonceGroup,
+    long nonce) throws IOException {
+    return 0;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java
new file mode 100644
index 00000000000..cd48370647d
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestFlushTableProcedure extends TestFlushTableProcedureBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestFlushTableProcedure.class);
+
+  @Test
+  public void testSimpleFlush() throws IOException {
+    assertTableMemStoreNotEmpty();
+    TEST_UTIL.getAdmin().flush(TABLE_NAME);
+    assertTableMemStoreEmpty();
+  }
+
+  @Test
+  public void testFlushTableExceptionally() throws IOException {
+    Admin admin = TEST_UTIL.getAdmin();
+    admin.disableTable(TABLE_NAME);
+    Assert.assertThrows(TableNotEnabledException.class, () -> 
admin.flush(TABLE_NAME));
+    admin.deleteTable(TABLE_NAME);
+    Assert.assertThrows(TableNotFoundException.class, () -> 
admin.flush(TABLE_NAME));
+  }
+
+  @Test
+  public void testSingleColumnFamilyFlush() throws IOException {
+    assertTableMemStoreNotEmpty();
+    TEST_UTIL.getAdmin().flush(TABLE_NAME, Arrays.asList(FAMILY1, FAMILY2, 
FAMILY3));
+    assertTableMemStoreEmpty();
+  }
+
+  @Test
+  public void testMultiColumnFamilyFlush() throws IOException {
+    assertTableMemStoreNotEmpty();
+    TEST_UTIL.getAdmin().flush(TABLE_NAME, Arrays.asList(FAMILY1, FAMILY2, 
FAMILY3));
+    assertTableMemStoreEmpty();
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBase.java
new file mode 100644
index 00000000000..9e3eee3c704
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBase.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+public class TestFlushTableProcedureBase {
+
+  protected static HBaseTestingUtil TEST_UTIL;
+
+  protected TableName TABLE_NAME;
+  protected byte[] FAMILY1;
+  protected byte[] FAMILY2;
+  protected byte[] FAMILY3;
+
+  @Before
+  public void setup() throws Exception {
+    TEST_UTIL = new HBaseTestingUtil();
+    addConfiguration(TEST_UTIL.getConfiguration());
+    TEST_UTIL.startMiniCluster(3);
+    TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestFlushTable"));
+    FAMILY1 = Bytes.toBytes("cf1");
+    FAMILY2 = Bytes.toBytes("cf2");
+    FAMILY3 = Bytes.toBytes("cf3");
+    final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10);
+    Table table =
+      TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY1, FAMILY2, 
FAMILY3 }, splitKeys);
+    TEST_UTIL.loadTable(table, FAMILY1, false);
+    TEST_UTIL.loadTable(table, FAMILY2, false);
+    TEST_UTIL.loadTable(table, FAMILY3, false);
+  }
+
+  protected void addConfiguration(Configuration config) {
+    // delay dispatch so that we can do something, for example kill a target 
server
+    config.setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 10000);
+    config.setInt(RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 
128);
+  }
+
+  protected void assertTableMemStoreNotEmpty() {
+    long totalSize = 
TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream()
+      .mapToLong(HRegion::getMemStoreDataSize).sum();
+    Assert.assertTrue(totalSize > 0);
+  }
+
+  protected void assertTableMemStoreEmpty() {
+    long totalSize = 
TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream()
+      .mapToLong(HRegion::getMemStoreDataSize).sum();
+    Assert.assertEquals(0, totalSize);
+  }
+
+  protected void assertColumnFamilyMemStoreNotEmpty(byte[] columnFamily) {
+    long totalSize = 
TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream()
+      .mapToLong(r -> 
r.getStore(columnFamily).getMemStoreSize().getDataSize()).sum();
+    Assert.assertTrue(totalSize > 0);
+  }
+
+  protected void assertColumnFamilyMemStoreEmpty(byte[] columnFamily) {
+    long totalSize = 
TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream()
+      .mapToLong(r -> 
r.getStore(columnFamily).getMemStoreSize().getDataSize()).sum();
+    Assert.assertEquals(0, totalSize);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (TEST_UTIL.getHBaseCluster().getMaster() != null) {
+      ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(
+        TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(), 
false);
+    }
+    TEST_UTIL.shutdownMiniCluster();
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureMasterRestarts.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureMasterRestarts.java
new file mode 100644
index 00000000000..c0c038982a9
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureMasterRestarts.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestFlushTableProcedureMasterRestarts extends 
TestFlushTableProcedureBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestFlushTableProcedureMasterRestarts.class);
+
+  @Test
+  public void testMasterRestarts() throws IOException {
+    assertTableMemStoreNotEmpty();
+
+    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+    ProcedureExecutor<MasterProcedureEnv> procExec = 
master.getMasterProcedureExecutor();
+    MasterProcedureEnv env = procExec.getEnvironment();
+    FlushTableProcedure proc = new FlushTableProcedure(env, TABLE_NAME);
+    long procId = procExec.submitProcedure(proc);
+    TEST_UTIL.waitFor(5000, 1000, () -> proc.getState().getNumber() > 1);
+
+    TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
+    TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 
30000);
+    TEST_UTIL.getHBaseCluster().startMaster();
+    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
+
+    master = TEST_UTIL.getHBaseCluster().getMaster();
+    procExec = master.getMasterProcedureExecutor();
+    ProcedureTestingUtility.waitProcedure(procExec, procId);
+    assertTableMemStoreEmpty();
+  }
+
+  @Test
+  public void testSkipRIT() throws IOException {
+    HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).get(0);
+
+    
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+      .getRegionStateNode(region.getRegionInfo())
+      .setState(RegionState.State.CLOSING, RegionState.State.OPEN);
+
+    FlushRegionProcedure proc = new 
FlushRegionProcedure(region.getRegionInfo());
+    
TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc);
+
+    // wait for a time which is shorter than RSProcedureDispatcher delays
+    TEST_UTIL.waitFor(5000, () -> proc.isFinished());
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java
new file mode 100644
index 00000000000..66ccf362fef
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestFlushTableProcedureWithDoNotSupportFlushTableMaster
+  extends TestFlushTableProcedureBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestFlushTableProcedureWithDoNotSupportFlushTableMaster.class);
+
+  @Override
+  protected void addConfiguration(Configuration config) {
+    super.addConfiguration(config);
+    config.set(HConstants.MASTER_IMPL, 
DoNotSupportFlushTableMaster.class.getName());
+  }
+
+  @Test
+  public void testFlushFallback() throws IOException {
+    assertTableMemStoreNotEmpty();
+    TEST_UTIL.getAdmin().flush(TABLE_NAME);
+    assertTableMemStoreEmpty();
+  }
+
+  @Test
+  public void testSingleColumnFamilyFlushFallback() throws IOException {
+    assertColumnFamilyMemStoreNotEmpty(FAMILY1);
+    TEST_UTIL.getAdmin().flush(TABLE_NAME, FAMILY1);
+    assertColumnFamilyMemStoreEmpty(FAMILY1);
+  }
+
+  @Test
+  public void testMultiColumnFamilyFlushFallback() throws IOException {
+    assertTableMemStoreNotEmpty();
+    TEST_UTIL.getAdmin().flush(TABLE_NAME, Arrays.asList(FAMILY1, FAMILY2, 
FAMILY3));
+    assertTableMemStoreEmpty();
+  }
+
+  public static final class DoNotSupportFlushTableMaster extends HMaster {
+
+    public DoNotSupportFlushTableMaster(Configuration conf) throws IOException 
{
+      super(conf);
+    }
+
+    @Override
+    public long flushTable(TableName tableName, List<byte[]> columnFamilies, 
long nonceGroup,
+      long nonce) throws IOException {
+      throw new DoNotRetryIOException("UnsupportedOperation: flushTable");
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java
index dc84642741f..c1d9e2788d4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.throttle;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.List;
@@ -124,7 +125,14 @@ public class TestFlushWithThroughputController {
         table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, 
qualifier, value));
       }
       long startTime = System.nanoTime();
-      hbtu.getAdmin().flush(tableName);
+      
hbtu.getHBaseCluster().getRegions(tableName).stream().findFirst().ifPresent(r 
-> {
+        try {
+          r.flush(true);
+        } catch (IOException e) {
+          LOG.error("Failed flush region {}", r, e);
+          fail("Failed flush region " + 
r.getRegionInfo().getRegionNameAsString());
+        }
+      });
       duration += System.nanoTime() - startTime;
     }
     HStore store = getStoreWithName(tableName);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
index 1c94affd1dd..9b1d8524d00 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java
@@ -237,6 +237,10 @@ public class VerifyingRSGroupAdmin implements Admin, 
Closeable {
     admin.flush(tableName, columnFamily);
   }
 
+  public void flush(TableName tableName, List<byte[]> columnFamilies) throws 
IOException {
+    admin.flush(tableName, columnFamilies);
+  }
+
   public void flushRegion(byte[] regionName) throws IOException {
     admin.flushRegion(regionName);
   }
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 1b3c29ebe66..1b7b6938524 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -490,6 +490,11 @@ public class ThriftAdmin implements Admin {
     throw new NotImplementedException("flush not supported in ThriftAdmin");
   }
 
+  @Override
+  public void flush(TableName tableName, List<byte[]> columnFamilies) {
+    throw new NotImplementedException("flush not supported in ThriftAdmin");
+  }
+
   @Override
   public void flushRegion(byte[] regionName) {
     throw new NotImplementedException("flushRegion not supported in 
ThriftAdmin");

Reply via email to