PHOENIX-4600 Add retry logic for partial index rebuilder writes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b768900d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b768900d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b768900d Branch: refs/heads/system-catalog Commit: b768900d9f9699fad612acc643866e3e420e7ce0 Parents: fc6cf43 Author: Vincent Poon <vincentp...@apache.org> Authored: Thu Apr 19 10:30:14 2018 -0700 Committer: Vincent Poon <vincentp...@apache.org> Committed: Thu Apr 19 12:03:34 2018 -0700 ---------------------------------------------------------------------- .../end2end/index/MutableIndexRebuilderIT.java | 143 +++++++++++++++++++ .../UngroupedAggregateRegionObserver.java | 32 +++-- 2 files changed, 160 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b768900d/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java new file mode 100644 index 0000000..8420f16 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexRebuilderIT.java @@ -0,0 +1,143 @@ +package org.apache.phoenix.end2end.index; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.phoenix.coprocessor.MetaDataRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.RunUntilFailure; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.google.common.collect.Maps; + +@RunWith(RunUntilFailure.class) +public class MutableIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { + private static final int WAIT_AFTER_DISABLED = 0; + private static final long REBUILD_PERIOD = 50000; + private static final long REBUILD_INTERVAL = 2000; + private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment; + + /** + * Tests that the index rebuilder retries for exactly the configured # of retries + * @throws Exception + */ + @Test + public void testRebuildRetriesSuccessful() throws Throwable { + int numberOfRetries = 5; + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString()); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, Long.toString(REBUILD_INTERVAL)); + serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED)); + serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numberOfRetries + ""); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + indexRebuildTaskRegionEnvironment = + (RegionCoprocessorEnvironment) getUtility() + .getRSForFirstRegionInTable( + PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + .get(0).getCoprocessorHost() + .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName()); + MetaDataRegionObserver.initRebuildIndexConnectionProps( + indexRebuildTaskRegionEnvironment.getConfiguration()); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String schemaName = generateUniqueName(); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) DISABLE_INDEX_ON_WRITE_FAILURE = TRUE"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)"); + HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + IndexUtil.updateIndexState(fullIndexName, EnvironmentEdgeManager.currentTimeMillis(), metaTable, PIndexState.DISABLE); + conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')"); + conn.commit(); + // Simulate write failure when rebuilder runs + TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class); + waitForIndexState(conn, fullTableName, fullIndexName, PIndexState.INACTIVE); + // rebuild writes should retry for exactly the configured number of times + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future<Boolean> future = executor.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + runIndexRebuilder(fullTableName); + return true; + }}); + assertTrue(future.get(120, TimeUnit.SECONDS)); + assertEquals(numberOfRetries, WriteFailingRegionObserver.attempts.get()); + } finally { + executor.shutdownNow(); + } + } + } + + public static void waitForIndexState(Connection conn, String fullTableName, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException { + int nRetries = 2; + PIndexState actualIndexState = null; + do { + runIndexRebuilder(fullTableName); + if ((actualIndexState = TestUtil.getIndexState(conn, fullIndexName)) == expectedIndexState) { + return; + } + Thread.sleep(1000); + } while (--nRetries > 0); + fail("Expected index state of " + expectedIndexState + ", but was " + actualIndexState); + } + + private static void runIndexRebuilder(String table) throws InterruptedException, SQLException { + runIndexRebuilder(Collections.<String>singletonList(table)); + } + + private static void runIndexRebuilder(List<String> tables) throws InterruptedException, SQLException { + BuildIndexScheduleTask task = + new MetaDataRegionObserver.BuildIndexScheduleTask( + indexRebuildTaskRegionEnvironment, tables); + task.run(); + } + + public static class WriteFailingRegionObserver extends SimpleRegionObserver { + public static volatile AtomicInteger attempts = new AtomicInteger(0); + @Override + public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + attempts.incrementAndGet(); + throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString()); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b768900d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index d202193..abdcf72 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -243,6 +243,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator()); } + private void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException { + try { + commitBatch(region, localRegionMutations, blockingMemstoreSize); + } catch (IOException e) { + handleIndexWriteException(localRegionMutations, e, new MutateCommand() { + @Override + public void doMutation() throws IOException { + commitBatch(region, localRegionMutations, blockingMemstoreSize); + } + }); + } + } + private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException { if (mutations.isEmpty()) { return; @@ -251,7 +264,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Mutation[] mutationArray = new Mutation[mutations.size()]; // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the // flush happen which decrease the memstore size and then writes allowed on the region. - for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { + for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { try { checkForRegionClosing(); Thread.sleep(100); @@ -892,16 +905,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto); separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations, isPKChanging); - try { - commitBatch(region, localRegionMutations, blockingMemStoreSize); - } catch (IOException e) { - handleIndexWriteException(localRegionMutations, e, new MutateCommand() { - @Override - public void doMutation() throws IOException { - commitBatch(region, localRegionMutations, blockingMemStoreSize); - } - }); - } + commitBatchWithRetries(region, localRegionMutations, blockingMemStoreSize); try { commitBatchWithHTable(targetHTable, remoteRegionMutations); } catch (IOException e) { @@ -1069,8 +1073,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, - HConstants.NO_NONCE); + commitBatchWithRetries(region, mutations, -1); uuidValue = ServerCacheClient.generateId(); mutations.clear(); } @@ -1079,8 +1082,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } while (hasMore); if (!mutations.isEmpty()) { - region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, - HConstants.NO_NONCE); + commitBatchWithRetries(region, mutations, -1); } } } catch (IOException e) {