This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push: new 13d2405edef HBASE-26867 Introduce a FlushProcedure (#5256) 13d2405edef is described below commit 13d2405edef1912a4a83e635ea66449b84f8f9a4 Author: Ruanhui <32773751+frostr...@users.noreply.github.com> AuthorDate: Mon Aug 14 21:52:03 2023 +0800 HBASE-26867 Introduce a FlushProcedure (#5256) Co-authored-by: huiruan <876107...@qq.com> Signed-off-by: Duo Zhang <zhang...@apache.org> (cherry picked from commit 20c9e4ba5f66dd959e7c62a1a8090164a329f571) --- .../java/org/apache/hadoop/hbase/client/Admin.java | 9 + .../hadoop/hbase/client/AdminOverAsyncAdmin.java | 5 + .../org/apache/hadoop/hbase/client/AsyncAdmin.java | 8 + .../hadoop/hbase/client/AsyncHBaseAdmin.java | 5 + .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 62 +++++- .../hbase/shaded/protobuf/RequestConverter.java | 13 ++ .../java/org/apache/hadoop/hbase/util/Strings.java | 6 + .../src/main/protobuf/server/master/Master.proto | 14 ++ .../protobuf/server/master/MasterProcedure.proto | 20 ++ .../apache/hadoop/hbase/executor/EventType.java | 8 +- .../apache/hadoop/hbase/executor/ExecutorType.java | 4 +- .../org/apache/hadoop/hbase/master/HMaster.java | 31 +++ .../hadoop/hbase/master/MasterRpcServices.java | 19 ++ .../apache/hadoop/hbase/master/MasterServices.java | 11 + .../master/procedure/FlushRegionProcedure.java | 238 +++++++++++++++++++++ .../master/procedure/FlushTableProcedure.java | 199 +++++++++++++++++ .../master/procedure/TableProcedureInterface.java | 1 + .../hadoop/hbase/master/procedure/TableQueue.java | 1 + .../procedure/flush/FlushTableSubprocedure.java | 21 +- .../flush/MasterFlushTableProcedureManager.java | 10 +- .../RegionServerFlushTableProcedureManager.java | 18 +- .../hbase/regionserver/FlushRegionCallable.java | 83 +++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 4 + .../hbase/master/MockNoopMasterServices.java | 6 + .../master/procedure/TestFlushTableProcedure.java | 69 ++++++ .../procedure/TestFlushTableProcedureBase.java | 97 +++++++++ .../TestFlushTableProcedureMasterRestarts.java | 76 +++++++ ...eProcedureWithDoNotSupportFlushTableMaster.java | 82 +++++++ .../TestFlushWithThroughputController.java | 10 +- .../hbase/rsgroup/VerifyingRSGroupAdmin.java | 4 + .../hadoop/hbase/thrift2/client/ThriftAdmin.java | 5 + 31 files changed, 1111 insertions(+), 28 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 547c6f4024a..4d579c16af2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -550,6 +550,15 @@ public interface Admin extends Abortable, Closeable { */ void flush(TableName tableName, byte[] columnFamily) throws IOException; + /** + * Flush the specified column family stores on all regions of the passed table. This runs as a + * synchronous operation. + * @param tableName table to flush + * @param columnFamilies column families within a table + * @throws IOException if a remote or network exception occurs + */ + void flush(TableName tableName, List<byte[]> columnFamilies) throws IOException; + /** * Flush an individual region. Synchronous operation. * @param regionName region to flush diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java index 48ce3dd9b01..690b6406fd3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java @@ -264,6 +264,11 @@ class AdminOverAsyncAdmin implements Admin { get(admin.flush(tableName, columnFamily)); } + @Override + public void flush(TableName tableName, List<byte[]> columnFamilies) throws IOException { + get(admin.flush(tableName, columnFamilies)); + } + @Override public void flushRegion(byte[] regionName) throws IOException { get(admin.flushRegion(regionName)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 473773e65ce..960982f5e3f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -347,6 +347,14 @@ public interface AsyncAdmin { */ CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily); + /** + * Flush the specified column family stores on all regions of the passed table. This runs as a + * synchronous operation. + * @param tableName table to flush + * @param columnFamilies column families within a table + */ + CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilies); + /** * Flush an individual region. * @param regionName region to flush diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 0fe99afbba8..5ee8a6ab826 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -269,6 +269,11 @@ class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.flush(tableName, columnFamily)); } + @Override + public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilies) { + return wrap(rawAdmin.flush(tableName, columnFamilies)); + } + @Override public CompletableFuture<Void> flushRegion(byte[] regionName) { return wrap(rawAdmin.flushRegion(regionName)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 9b3baec87c7..ee1dfac16bd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ClientMetaTableAccessor; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterMetricsBuilder; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,6 +182,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; @@ -950,12 +954,50 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Void> flush(TableName tableName) { - return flush(tableName, null); + return flush(tableName, Collections.emptyList()); } @Override public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) { + return flush(tableName, Collections.singletonList(columnFamily)); + } + + @Override + public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) { + // This is for keeping compatibility with old implementation. + // If the server version is lower than the client version, it's possible that the + // flushTable method is not present in the server side, if so, we need to fall back + // to the old implementation. + List<byte[]> columnFamilies = columnFamilyList.stream() + .filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList()); + FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies, + ng.getNonceGroup(), ng.newNonce()); + CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall( + tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), + (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); CompletableFuture<Void> future = new CompletableFuture<>(); + addListener(procFuture, (ret, error) -> { + if (error != null) { + if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) { + future.completeExceptionally(error); + } else if (error instanceof DoNotRetryIOException) { + // usually this is caused by the method is not present on the server or + // the hbase hadoop version does not match the running hadoop version. + // if that happens, we need fall back to the old flush implementation. + LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); + legacyFlush(future, tableName, columnFamilies); + } else { + future.completeExceptionally(error); + } + } else { + future.complete(ret); + } + }); + return future; + } + + private void legacyFlush(CompletableFuture<Void> future, TableName tableName, + List<byte[]> columnFamilies) { addListener(tableExists(tableName), (exists, err) -> { if (err != null) { future.completeExceptionally(err); @@ -969,8 +1011,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { future.completeExceptionally(new TableNotEnabledException(tableName)); } else { Map<String, String> props = new HashMap<>(); - if (columnFamily != null) { - props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); + if (columnFamilies != null && !columnFamilies.isEmpty()) { + props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER + .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))); } addListener( execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), @@ -985,7 +1028,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { }); } }); - return future; } @Override @@ -2768,6 +2810,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } + private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { + + FlushTableProcedureBiConsumer(TableName tableName) { + super(tableName); + } + + @Override + String getOperationType() { + return "FLUSH"; + } + } + private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer { CreateNamespaceProcedureBiConsumer(String namespaceName) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 9c88b61fd67..33884158da4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; @@ -1714,4 +1715,16 @@ public final class RequestConverter { } return RemoveServersRequest.newBuilder().addAllServers(hostPorts).build(); } + + public static FlushTableRequest buildFlushTableRequest(final TableName tableName, + final List<byte[]> columnFamilies, final long nonceGroup, final long nonce) { + FlushTableRequest.Builder builder = FlushTableRequest.newBuilder(); + builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + if (!columnFamilies.isEmpty()) { + for (byte[] columnFamily : columnFamilies) { + builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + } + return builder.setNonceGroup(nonceGroup).setNonce(nonce).build(); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java index cdf5bf63fb5..3baab9cca21 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.util; import org.apache.commons.lang3.StringUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.base.Joiner; +import org.apache.hbase.thirdparty.com.google.common.base.Splitter; + /** * Utility for Strings. */ @@ -28,6 +31,9 @@ public final class Strings { public static final String DEFAULT_SEPARATOR = "="; public static final String DEFAULT_KEYVALUE_SEPARATOR = ", "; + public static final Joiner JOINER = Joiner.on(","); + public static final Splitter SPLITTER = Splitter.on(","); + private Strings() { } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index 6f512a92152..5d715fdcdd1 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -200,6 +200,17 @@ message ModifyTableResponse { optional uint64 proc_id = 1; } +message FlushTableRequest { + required TableName table_name = 1; + repeated bytes column_family = 2; + optional uint64 nonce_group = 3 [default = 0]; + optional uint64 nonce = 4 [default = 0]; +} + +message FlushTableResponse { + optional uint64 proc_id = 1; +} + /* Namespace-level protobufs */ message CreateNamespaceRequest { @@ -1239,6 +1250,9 @@ service MasterService { rpc FlushMasterStore(FlushMasterStoreRequest) returns(FlushMasterStoreResponse); + + rpc FlushTable(FlushTableRequest) + returns(FlushTableResponse); } // HBCK Service definitions. diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 901abf6bd0c..3f3ecd63b00 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -191,6 +191,26 @@ message RestoreParentToChildRegionsPair { required string child2_region_name = 3; } +enum FlushTableState { + FLUSH_TABLE_PREPARE = 1; + FLUSH_TABLE_FLUSH_REGIONS = 2; +} + +message FlushTableProcedureStateData { + required TableName table_name = 1; + repeated bytes column_family = 2; +} + +message FlushRegionProcedureStateData { + required RegionInfo region = 1; + repeated bytes column_family = 2; +} + +message FlushRegionParameter { + required RegionInfo region = 1; + repeated bytes column_family = 2; +} + enum SnapshotState { SNAPSHOT_PREPARE = 1; SNAPSHOT_PRE_OPERATION = 2; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index e79c9c2bc41..07f8339a20d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -291,7 +291,13 @@ public enum EventType { * RS verify snapshot.<br> * RS_VERIFY_SNAPSHOT */ - RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS); + RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS), + + /** + * RS flush regions.<br> + * RS_FLUSH_OPERATIONS + */ + RS_FLUSH_REGIONS(89, ExecutorType.RS_FLUSH_OPERATIONS); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 1af70b2c44b..b16a2dd4f95 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -54,7 +54,9 @@ public enum ExecutorType { RS_SWITCH_RPC_THROTTLE(33), RS_IN_MEMORY_COMPACTION(34), RS_CLAIM_REPLICATION_QUEUE(35), - RS_SNAPSHOT_OPERATIONS(36); + RS_SNAPSHOT_OPERATIONS(36), + + RS_FLUSH_OPERATIONS(37); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 1b529149150..995bff17724 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -157,6 +157,7 @@ import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure; import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -4381,4 +4382,34 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste // initialize master side coprocessors before we start handling requests this.cpHost = new MasterCoprocessorHost(this, conf); } + + @Override + public long flushTable(TableName tableName, List<byte[]> columnFamilies, long nonceGroup, + long nonce) throws IOException { + checkInitialized(); + + if ( + !getConfiguration().getBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT) + ) { + throw new DoNotRetryIOException("FlushTableProcedureV2 is DISABLED"); + } + + return MasterProcedureUtil + .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preTableFlush(tableName); + LOG.info(getClientIdAuditPrefix() + " flush " + tableName); + submitProcedure( + new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamilies)); + getMaster().getMasterCoprocessorHost().postTableFlush(tableName); + } + + @Override + protected String getDescription() { + return "FlushTableProcedure"; + } + }); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 2e416f5e1a0..b6a17d8503b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -235,6 +235,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaReq import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; @@ -3590,4 +3592,21 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster> } return FlushMasterStoreResponse.newBuilder().build(); } + + @Override + public FlushTableResponse flushTable(RpcController controller, FlushTableRequest req) + throws ServiceException { + TableName tableName = ProtobufUtil.toTableName(req.getTableName()); + List<byte[]> columnFamilies = req.getColumnFamilyCount() > 0 + ? req.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()).map(ByteString::toByteArray) + .collect(Collectors.toList()) + : null; + try { + long procId = + server.flushTable(tableName, columnFamilies, req.getNonceGroup(), req.getNonce()); + return FlushTableResponse.newBuilder().setProcId(procId).build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 95166240c78..933bf0d1815 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -477,4 +477,15 @@ public interface MasterServices extends Server { * Flush master local region */ void flushMasterStore() throws IOException; + + /** + * Flush an existing table + * @param tableName The table name + * @param columnFamilies The column families to flush + * @param nonceGroup the nonce group + * @param nonce the nonce + * @return the flush procedure id + */ + long flushTable(final TableName tableName, final List<byte[]> columnFamilies, + final long nonceGroup, final long nonce) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java new file mode 100644 index 00000000000..67f0442b618 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.assignment.RegionStateNode; +import org.apache.hadoop.hbase.master.assignment.RegionStates; +import org.apache.hadoop.hbase.master.assignment.ServerState; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.regionserver.FlushRegionCallable; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +@InterfaceAudience.Private +public class FlushRegionProcedure extends Procedure<MasterProcedureEnv> + implements TableProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> { + private static final Logger LOG = LoggerFactory.getLogger(FlushRegionProcedure.class); + + private RegionInfo region; + private List<byte[]> columnFamilies; + private ProcedureEvent<?> event; + private boolean dispatched; + private boolean succ; + private RetryCounter retryCounter; + + public FlushRegionProcedure() { + } + + public FlushRegionProcedure(RegionInfo region) { + this(region, null); + } + + public FlushRegionProcedure(RegionInfo region, List<byte[]> columnFamilies) { + this.region = region; + this.columnFamilies = columnFamilies; + } + + @Override + protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + + RegionStates regionStates = env.getAssignmentManager().getRegionStates(); + RegionStateNode regionNode = regionStates.getRegionStateNode(region); + regionNode.lock(); + try { + if (!regionNode.isInState(State.OPEN) || regionNode.isInTransition()) { + LOG.info("State of region {} is not OPEN or in transition. Skip {} ...", region, this); + return null; + } + ServerName targetServer = regionNode.getRegionLocation(); + if (targetServer == null) { + setTimeoutForSuspend(env, + String.format("target server of region %s is null", region.getRegionNameAsString())); + throw new ProcedureSuspendedException(); + } + ServerState serverState = regionStates.getServerNode(targetServer).getState(); + if (serverState != ServerState.ONLINE) { + setTimeoutForSuspend(env, String.format("target server of region %s %s is in state %s", + region.getRegionNameAsString(), targetServer, serverState)); + throw new ProcedureSuspendedException(); + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } catch (FailedRemoteDispatchException e) { + setTimeoutForSuspend(env, "Failed send request to " + targetServer); + throw new ProcedureSuspendedException(); + } + } finally { + regionNode.unlock(); + } + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException e) { + complete(env, e); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (isFinished()) { + LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); + return; + } + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + if (error == null) { + succ = true; + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + private void setTimeoutForSuspend(MasterProcedureEnv env, String reason) { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("{} can not run currently because {}, wait {} ms to retry", this, reason, backoff); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + FlushRegionProcedureStateData.Builder builder = FlushRegionProcedureStateData.newBuilder(); + builder.setRegion(ProtobufUtil.toRegionInfo(region)); + if (columnFamilies != null) { + for (byte[] columnFamily : columnFamilies) { + if (columnFamily != null && columnFamily.length > 0) { + builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + } + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + FlushRegionProcedureStateData data = + serializer.deserialize(FlushRegionProcedureStateData.class); + this.region = ProtobufUtil.toRegionInfo(data.getRegion()); + if (data.getColumnFamilyCount() > 0) { + this.columnFamilies = data.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()) + .map(ByteString::toByteArray).collect(Collectors.toList()); + } + } + + @Override + public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { + FlushRegionParameter.Builder builder = FlushRegionParameter.newBuilder(); + builder.setRegion(ProtobufUtil.toRegionInfo(region)); + if (columnFamilies != null) { + for (byte[] columnFamily : columnFamilies) { + if (columnFamily != null && columnFamily.length > 0) { + builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + } + } + return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), + FlushRegionCallable.class, builder.build().toByteArray())); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.FLUSH; + } + + @Override + protected boolean waitInitialized(MasterProcedureEnv env) { + return env.waitInitialized(this); + } + + @Override + public TableName getTableName() { + return region.getTable(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java new file mode 100644 index 00000000000..892d4d13b5e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableState; + +@InterfaceAudience.Private +public class FlushTableProcedure extends AbstractStateMachineTableProcedure<FlushTableState> { + private static final Logger LOG = LoggerFactory.getLogger(FlushTableProcedure.class); + + private TableName tableName; + + private List<byte[]> columnFamilies; + + public FlushTableProcedure() { + super(); + } + + public FlushTableProcedure(MasterProcedureEnv env, TableName tableName) { + this(env, tableName, null); + } + + public FlushTableProcedure(MasterProcedureEnv env, TableName tableName, + List<byte[]> columnFamilies) { + super(env); + this.tableName = tableName; + this.columnFamilies = columnFamilies; + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + // Here we don't acquire table lock because the flush operation and other operations (like + // split or merge) are not mutually exclusive. Region will flush memstore when being closed. + // It's safe even if we don't have lock. However, currently we are limited by the scheduling + // mechanism of the procedure scheduler and have to acquire table shared lock here. See + // HBASE-27905 for details. + if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + env.getProcedureScheduler().wakeTableSharedLock(this, getTableName()); + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, FlushTableState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + LOG.info("{} execute state={}", this, state); + + try { + switch (state) { + case FLUSH_TABLE_PREPARE: + preflightChecks(env, true); + setNextState(FlushTableState.FLUSH_TABLE_FLUSH_REGIONS); + return Flow.HAS_MORE_STATE; + case FLUSH_TABLE_FLUSH_REGIONS: + addChildProcedure(createFlushRegionProcedures(env)); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (Exception e) { + if (e instanceof DoNotRetryIOException) { + // for example, TableNotFoundException or TableNotEnabledException + setFailure("master-flush-table", e); + LOG.warn("Unrecoverable error trying to flush " + getTableName() + " state=" + state, e); + } else { + LOG.warn("Retriable error trying to flush " + getTableName() + " state=" + state, e); + } + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(MasterProcedureEnv env, FlushTableState state) + throws IOException, InterruptedException { + // nothing to rollback + } + + @Override + protected FlushTableState getState(int stateId) { + return FlushTableState.forNumber(stateId); + } + + @Override + protected int getStateId(FlushTableState state) { + return state.getNumber(); + } + + @Override + protected FlushTableState getInitialState() { + return FlushTableState.FLUSH_TABLE_PREPARE; + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.FLUSH; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + FlushTableProcedureStateData.Builder builder = FlushTableProcedureStateData.newBuilder(); + builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + if (columnFamilies != null) { + for (byte[] columnFamily : columnFamilies) { + if (columnFamily != null && columnFamily.length > 0) { + builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + } + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + FlushTableProcedureStateData data = serializer.deserialize(FlushTableProcedureStateData.class); + this.tableName = ProtobufUtil.toTableName(data.getTableName()); + if (data.getColumnFamilyCount() > 0) { + this.columnFamilies = data.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()) + .map(ByteString::toByteArray).collect(Collectors.toList()); + } + } + + private FlushRegionProcedure[] createFlushRegionProcedures(MasterProcedureEnv env) { + return env.getAssignmentManager().getTableRegions(getTableName(), true).stream() + .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) + .map(r -> new FlushRegionProcedure(r, columnFamilies)).toArray(FlushRegionProcedure[]::new); + } + + @Override + public void toStringClassDetails(StringBuilder builder) { + builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", table=") + .append(tableName); + if (columnFamilies != null) { + builder.append(", columnFamilies=[") + .append(Strings.JOINER + .join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList()))) + .append("]"); + } + } + + @Override + protected void afterReplay(MasterProcedureEnv env) { + if ( + !env.getMasterConfiguration().getBoolean( + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT) + ) { + setFailure("master-flush-table", new HBaseIOException("FlushTableProcedureV2 is DISABLED")); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java index c689e52302f..1ca5b17ac21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java @@ -42,6 +42,7 @@ public interface TableProcedureInterface { READ, SNAPSHOT, REGION_SNAPSHOT, + FLUSH, REGION_EDIT, REGION_SPLIT, REGION_MERGE, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java index 1a9847edcc8..d1acd08ea21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java @@ -56,6 +56,7 @@ class TableQueue extends Queue<TableName> { // we allow concurrent edit on the ns family in meta table return !proc.getTableName().equals(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); case READ: + case FLUSH: case SNAPSHOT: return false; // region operations are using the shared-lock on the table diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index b521a85e7a0..88ddf6102a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hbase.procedure.flush; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.procedure.ProcedureMember; @@ -41,16 +41,16 @@ public class FlushTableSubprocedure extends Subprocedure { private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class); private final String table; - private final String family; + private final List<String> families; private final List<HRegion> regions; private final FlushTableSubprocedurePool taskManager; public FlushTableSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener, - long wakeFrequency, long timeout, List<HRegion> regions, String table, String family, + long wakeFrequency, long timeout, List<HRegion> regions, String table, List<String> families, FlushTableSubprocedurePool taskManager) { super(member, table, errorListener, wakeFrequency, timeout); this.table = table; - this.family = family; + this.families = families; this.regions = regions; this.taskManager = taskManager; } @@ -70,7 +70,7 @@ public class FlushTableSubprocedure extends Subprocedure { region.startRegionOperation(); try { LOG.debug("Flush region " + region.toString() + " started..."); - if (families == null) { + if (families == null || families.isEmpty()) { region.flush(true); } else { region.flushcache(families, false, FlushLifeCycleTracker.DUMMY); @@ -97,15 +97,16 @@ public class FlushTableSubprocedure extends Subprocedure { throw new IllegalStateException( "Attempting to flush " + table + " but we currently have outstanding tasks"); } - List<byte[]> families = null; - if (family != null) { - LOG.debug("About to flush family {} on all regions for table {}", family, table); - families = Collections.singletonList(Bytes.toBytes(family)); + + List<byte[]> familiesToFlush = null; + if (families != null && !families.isEmpty()) { + LOG.debug("About to flush family {} on all regions for table {}", families, table); + familiesToFlush = families.stream().map(Bytes::toBytes).collect(Collectors.toList()); } // Add all hfiles already existing in region. for (HRegion region : regions) { // submit one task per region for parallelize by region. - taskManager.submitTask(new RegionFlushTask(region, families)); + taskManager.submitTask(new RegionFlushTask(region, familiesToFlush)); monitor.rethrowException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java index 15d3d8a73a9..529d71ba728 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -58,6 +58,10 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager { public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; + public static final String FLUSH_PROCEDURE_ENABLED = "hbase.flush.procedure.enabled"; + + public static final boolean FLUSH_PROCEDURE_ENABLED_DEFAULT = true; + private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis"; private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis"; @@ -142,13 +146,13 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager { ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); - HBaseProtos.NameStringPair family = null; + HBaseProtos.NameStringPair families = null; for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) { if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) { - family = nsp; + families = nsp; } } - byte[] procArgs = family != null ? family.toByteArray() : new byte[0]; + byte[] procArgs = families != null ? families.toByteArray() : new byte[0]; // Kick of the global procedure from the master coordinator to the region servers. // We rely on the existing Distributed Procedure framework to prevent any concurrent diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index 3322f7a5cd5..2cee89b5749 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -50,6 +51,7 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -129,7 +131,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur * of a race where regions may be missed. * @return Subprocedure to submit to the ProcedureMember. */ - public Subprocedure buildSubprocedure(String table, String family) { + public Subprocedure buildSubprocedure(String table, List<String> families) { // don't run the subprocedure if the parent is stop(ping) if (rss.isStopping() || rss.isStopped()) { @@ -159,7 +161,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur FlushTableSubprocedurePool taskManager = new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, - involvedRegions, table, family, taskManager); + involvedRegions, table, families, taskManager); } /** @@ -175,19 +177,19 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur @Override public Subprocedure buildSubprocedure(String name, byte[] data) { - String family = null; - // Currently we do not put other data except family, so it is ok to - // judge by length that if family was specified + List<String> families = null; + // Currently we do not put other data except families, so it is ok to + // judge by length that if families were specified if (data.length > 0) { try { HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data); - family = nsp.getValue(); + families = ImmutableList.copyOf(Strings.SPLITTER.split(nsp.getValue())); } catch (Exception e) { - LOG.error("fail to get family by parsing from data", e); + LOG.error("fail to get families by parsing from data", e); } } // The name of the procedure instance from the master is the table name. - return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family); + return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, families); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java new file mode 100644 index 00000000000..3dd932a1736 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; + +@InterfaceAudience.Private +public class FlushRegionCallable extends BaseRSProcedureCallable { + + private static final Logger LOG = LoggerFactory.getLogger(FlushRegionCallable.class); + + private RegionInfo regionInfo; + + private List<byte[]> columnFamilies; + + @Override + protected void doCall() throws Exception { + HRegion region = rs.getRegion(regionInfo.getEncodedName()); + if (region == null) { + throw new NotServingRegionException("region=" + regionInfo.getRegionNameAsString()); + } + LOG.debug("Starting region operation on {}", region); + region.startRegionOperation(); + try { + HRegion.FlushResult res; + if (columnFamilies == null) { + res = region.flush(true); + } else { + res = region.flushcache(columnFamilies, false, FlushLifeCycleTracker.DUMMY); + } + if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { + throw new IOException("Unable to complete flush " + regionInfo); + } + } finally { + LOG.debug("Closing region operation on {}", region); + region.closeRegionOperation(); + } + } + + @Override + protected void initParameter(byte[] parameter) throws Exception { + FlushRegionParameter param = FlushRegionParameter.parseFrom(parameter); + this.regionInfo = ProtobufUtil.toRegionInfo(param.getRegion()); + if (param.getColumnFamilyCount() > 0) { + this.columnFamilies = param.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()) + .map(ByteString::toByteArray).collect(Collectors.toList()); + } + } + + @Override + public EventType getEventType() { + return EventType.RS_FLUSH_REGIONS; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f9f84118106..07d2ac332c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1906,6 +1906,10 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices> executorService.startExecutorService( executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) .setCorePoolSize(rsSnapshotOperationThreads)); + final int rsFlushOperationThreads = + conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3); + executorService.startExecutorService(executorService.new ExecutorConfig() + .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index c82220a8b22..a19b6ffbec6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -536,4 +536,10 @@ public class MockNoopMasterServices implements MasterServices { public Semaphore getSyncReplicationPeerLock() { return null; } + + @Override + public long flushTable(TableName tableName, List<byte[]> columnFamilies, long nonceGroup, + long nonce) throws IOException { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java new file mode 100644 index 00000000000..cd48370647d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedure extends TestFlushTableProcedureBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedure.class); + + @Test + public void testSimpleFlush() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME); + assertTableMemStoreEmpty(); + } + + @Test + public void testFlushTableExceptionally() throws IOException { + Admin admin = TEST_UTIL.getAdmin(); + admin.disableTable(TABLE_NAME); + Assert.assertThrows(TableNotEnabledException.class, () -> admin.flush(TABLE_NAME)); + admin.deleteTable(TABLE_NAME); + Assert.assertThrows(TableNotFoundException.class, () -> admin.flush(TABLE_NAME)); + } + + @Test + public void testSingleColumnFamilyFlush() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME, Arrays.asList(FAMILY1, FAMILY2, FAMILY3)); + assertTableMemStoreEmpty(); + } + + @Test + public void testMultiColumnFamilyFlush() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME, Arrays.asList(FAMILY1, FAMILY2, FAMILY3)); + assertTableMemStoreEmpty(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBase.java new file mode 100644 index 00000000000..9e3eee3c704 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBase.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +public class TestFlushTableProcedureBase { + + protected static HBaseTestingUtil TEST_UTIL; + + protected TableName TABLE_NAME; + protected byte[] FAMILY1; + protected byte[] FAMILY2; + protected byte[] FAMILY3; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + addConfiguration(TEST_UTIL.getConfiguration()); + TEST_UTIL.startMiniCluster(3); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestFlushTable")); + FAMILY1 = Bytes.toBytes("cf1"); + FAMILY2 = Bytes.toBytes("cf2"); + FAMILY3 = Bytes.toBytes("cf3"); + final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10); + Table table = + TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY1, FAMILY2, FAMILY3 }, splitKeys); + TEST_UTIL.loadTable(table, FAMILY1, false); + TEST_UTIL.loadTable(table, FAMILY2, false); + TEST_UTIL.loadTable(table, FAMILY3, false); + } + + protected void addConfiguration(Configuration config) { + // delay dispatch so that we can do something, for example kill a target server + config.setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 10000); + config.setInt(RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 128); + } + + protected void assertTableMemStoreNotEmpty() { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(HRegion::getMemStoreDataSize).sum(); + Assert.assertTrue(totalSize > 0); + } + + protected void assertTableMemStoreEmpty() { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(HRegion::getMemStoreDataSize).sum(); + Assert.assertEquals(0, totalSize); + } + + protected void assertColumnFamilyMemStoreNotEmpty(byte[] columnFamily) { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(r -> r.getStore(columnFamily).getMemStoreSize().getDataSize()).sum(); + Assert.assertTrue(totalSize > 0); + } + + protected void assertColumnFamilyMemStoreEmpty(byte[] columnFamily) { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(r -> r.getStore(columnFamily).getMemStoreSize().getDataSize()).sum(); + Assert.assertEquals(0, totalSize); + } + + @After + public void teardown() throws Exception { + if (TEST_UTIL.getHBaseCluster().getMaster() != null) { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( + TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false); + } + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureMasterRestarts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureMasterRestarts.java new file mode 100644 index 00000000000..c0c038982a9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureMasterRestarts.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedureMasterRestarts extends TestFlushTableProcedureBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedureMasterRestarts.class); + + @Test + public void testMasterRestarts() throws IOException { + assertTableMemStoreNotEmpty(); + + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + FlushTableProcedure proc = new FlushTableProcedure(env, TABLE_NAME); + long procId = procExec.submitProcedure(proc); + TEST_UTIL.waitFor(5000, 1000, () -> proc.getState().getNumber() > 1); + + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 30000); + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + + master = TEST_UTIL.getHBaseCluster().getMaster(); + procExec = master.getMasterProcedureExecutor(); + ProcedureTestingUtility.waitProcedure(procExec, procId); + assertTableMemStoreEmpty(); + } + + @Test + public void testSkipRIT() throws IOException { + HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).get(0); + + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .getRegionStateNode(region.getRegionInfo()) + .setState(RegionState.State.CLOSING, RegionState.State.OPEN); + + FlushRegionProcedure proc = new FlushRegionProcedure(region.getRegionInfo()); + TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc); + + // wait for a time which is shorter than RSProcedureDispatcher delays + TEST_UTIL.waitFor(5000, () -> proc.isFinished()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java new file mode 100644 index 00000000000..66ccf362fef --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedureWithDoNotSupportFlushTableMaster + extends TestFlushTableProcedureBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedureWithDoNotSupportFlushTableMaster.class); + + @Override + protected void addConfiguration(Configuration config) { + super.addConfiguration(config); + config.set(HConstants.MASTER_IMPL, DoNotSupportFlushTableMaster.class.getName()); + } + + @Test + public void testFlushFallback() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME); + assertTableMemStoreEmpty(); + } + + @Test + public void testSingleColumnFamilyFlushFallback() throws IOException { + assertColumnFamilyMemStoreNotEmpty(FAMILY1); + TEST_UTIL.getAdmin().flush(TABLE_NAME, FAMILY1); + assertColumnFamilyMemStoreEmpty(FAMILY1); + } + + @Test + public void testMultiColumnFamilyFlushFallback() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME, Arrays.asList(FAMILY1, FAMILY2, FAMILY3)); + assertTableMemStoreEmpty(); + } + + public static final class DoNotSupportFlushTableMaster extends HMaster { + + public DoNotSupportFlushTableMaster(Configuration conf) throws IOException { + super(conf); + } + + @Override + public long flushTable(TableName tableName, List<byte[]> columnFamilies, long nonceGroup, + long nonce) throws IOException { + throw new DoNotRetryIOException("UnsupportedOperation: flushTable"); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java index dc84642741f..c1d9e2788d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.throttle; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.List; @@ -124,7 +125,14 @@ public class TestFlushWithThroughputController { table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } long startTime = System.nanoTime(); - hbtu.getAdmin().flush(tableName); + hbtu.getHBaseCluster().getRegions(tableName).stream().findFirst().ifPresent(r -> { + try { + r.flush(true); + } catch (IOException e) { + LOG.error("Failed flush region {}", r, e); + fail("Failed flush region " + r.getRegionInfo().getRegionNameAsString()); + } + }); duration += System.nanoTime() - startTime; } HStore store = getStoreWithName(tableName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java index 1c94affd1dd..9b1d8524d00 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java @@ -237,6 +237,10 @@ public class VerifyingRSGroupAdmin implements Admin, Closeable { admin.flush(tableName, columnFamily); } + public void flush(TableName tableName, List<byte[]> columnFamilies) throws IOException { + admin.flush(tableName, columnFamilies); + } + public void flushRegion(byte[] regionName) throws IOException { admin.flushRegion(regionName); } diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 1b3c29ebe66..1b7b6938524 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -490,6 +490,11 @@ public class ThriftAdmin implements Admin { throw new NotImplementedException("flush not supported in ThriftAdmin"); } + @Override + public void flush(TableName tableName, List<byte[]> columnFamilies) { + throw new NotImplementedException("flush not supported in ThriftAdmin"); + } + @Override public void flushRegion(byte[] regionName) { throw new NotImplementedException("flushRegion not supported in ThriftAdmin");