HBASE-20377 Deal with table in enabling and disabling state when modifying serial replication peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5a633adf Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5a633adf Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5a633adf Branch: refs/heads/HBASE-20388 Commit: 5a633adffead3b979f6e1a607994409978b0ea74 Parents: 826909a Author: zhangduo <zhang...@apache.org> Authored: Fri Apr 13 16:21:03 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Apr 13 20:33:29 2018 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/client/TableState.java | 14 ++++ .../master/procedure/DisableTableProcedure.java | 6 +- .../master/replication/ModifyPeerProcedure.java | 84 +++++++++++++------- .../TestAddToSerialReplicationPeer.java | 74 +++++++++++++++++ 4 files changed, 146 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java index cc3b765..40612e9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java @@ -104,6 +104,13 @@ public class TableState { } /** + * @return True if table is {@link State#ENABLING}. + */ + public boolean isEnabling() { + return isInStates(State.ENABLING); + } + + /** * @return True if {@link State#ENABLED} or {@link State#ENABLING} */ public boolean isEnabledOrEnabling() { @@ -118,6 +125,13 @@ public class TableState { } /** + * @return True if table is disabling. + */ + public boolean isDisabling() { + return isInStates(State.DISABLING); + } + + /** * @return True if {@link State#DISABLED} or {@link State#DISABLED} */ public boolean isDisabledOrDisabling() { http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index f5caff7..685a73e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -107,7 +108,7 @@ public class DisableTableProcedure break; case DISABLE_TABLE_MARK_REGIONS_OFFLINE: addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName)); - setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE); + setNextState(DisableTableState.DISABLE_TABLE_ADD_REPLICATION_BARRIER); break; case DISABLE_TABLE_ADD_REPLICATION_BARRIER: if (env.getMasterServices().getTableDescriptors().get(tableName) @@ -119,7 +120,8 @@ public class DisableTableProcedure .getRegionsOfTable(tableName)) { long maxSequenceId = WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); - mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, maxSequenceId, + long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM; + mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum, EnvironmentEdgeManager.currentTime())); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 8bedeff..0b9efce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -18,17 +18,16 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException; -import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; @@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +54,9 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000; + // The sleep interval when waiting table to be enabled or disabled. + protected static final int SLEEP_INTERVAL_MS = 1000; + protected ModifyPeerProcedure() { } @@ -126,6 +127,27 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi throw new UnsupportedOperationException(); } + // If the table is in enabling state, we need to wait until it is enabled and then reopen all its + // regions. + private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException { + for (;;) { + try { + TableState state = tsm.getTableState(tn); + if (state.isEnabled()) { + return true; + } + if (!state.isEnabling()) { + return false; + } + Thread.sleep(SLEEP_INTERVAL_MS); + } catch (TableStateNotFoundException e) { + return false; + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); + } + } + } + private void reopenRegions(MasterProcedureEnv env) throws IOException { ReplicationPeerConfig peerConfig = getNewPeerConfig(); ReplicationPeerConfig oldPeerConfig = getOldPeerConfig(); @@ -142,15 +164,10 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi ReplicationUtils.contains(oldPeerConfig, tn)) { continue; } - try { - if (!tsm.getTableState(tn).isEnabled()) { - continue; - } - } catch (TableStateNotFoundException e) { - continue; + if (needReopen(tsm, tn)) { + addChildProcedure(env.getAssignmentManager().createReopenProcedures( + env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn))); } - addChildProcedure(env.getAssignmentManager().createReopenProcedures( - env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn))); } } @@ -183,6 +200,26 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi } } + // If the table is currently disabling, then we need to wait until it is disabled.We will write + // replication barrier for a disabled table. And return whether we need to update the last pushed + // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException, + // then we do not need to update last pushed sequence id for this table. + private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn) + throws IOException { + for (;;) { + try { + if (!tsm.getTableState(tn).isDisabling()) { + return true; + } + Thread.sleep(SLEEP_INTERVAL_MS); + } catch (TableStateNotFoundException e) { + return false; + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e); + } + } + } + // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller // should not forget to check whether the map is empty at last, if not you should call @@ -192,26 +229,13 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi TableStateManager tsm = env.getMasterServices().getTableStateManager(); ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage(); Connection conn = env.getMasterServices().getConnection(); - RegionStates regionStates = env.getAssignmentManager().getRegionStates(); - MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); - boolean isTableEnabled; - try { - isTableEnabled = tsm.getTableState(tableName).isEnabled(); - } catch (TableStateNotFoundException e) { + if (!needSetLastPushedSequenceId(tsm, tableName)) { return; } - if (isTableEnabled) { - for (Pair<String, Long> name2Barrier : MetaTableAccessor - .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { - addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, - queueStorage); - } - } else { - for (RegionInfo region : regionStates.getRegionsOfTable(tableName, true)) { - long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); - addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage); - } + for (Pair<String, Long> name2Barrier : MetaTableAccessor + .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) { + addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1, + queueStorage); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5a633adf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java index 317c120..d3d6cbe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.replication; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.Collections; import org.apache.hadoop.fs.Path; @@ -26,6 +28,8 @@ import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.replication.regionserver.Replication; @@ -192,4 +196,74 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase { waitUntilReplicationDone(100); checkOrder(100); } + + @Test + public void testDisablingTable() throws Exception { + TableName tableName = createTable(); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + UTIL.getAdmin().disableTable(tableName); + rollAllWALs(); + TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); + tsm.setTableState(tableName, TableState.State.DISABLING); + Thread t = new Thread(() -> { + try { + addPeer(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + t.start(); + Thread.sleep(5000); + // we will wait on the disabling table so the thread should still be alive. + assertTrue(t.isAlive()); + tsm.setTableState(tableName, TableState.State.DISABLED); + t.join(); + UTIL.getAdmin().enableTable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(100); + checkOrder(100); + } + + @Test + public void testEnablingTable() throws Exception { + TableName tableName = createTable(); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); + moveRegionAndArchiveOldWals(region, rs); + TableStateManager tsm = UTIL.getMiniHBaseCluster().getMaster().getTableStateManager(); + tsm.setTableState(tableName, TableState.State.ENABLING); + Thread t = new Thread(() -> { + try { + addPeer(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + t.start(); + Thread.sleep(5000); + // we will wait on the disabling table so the thread should still be alive. + assertTrue(t.isAlive()); + tsm.setTableState(tableName, TableState.State.ENABLED); + t.join(); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + waitUntilReplicationDone(100); + checkOrder(100); + } }