PHOENIX-4704 Presplit index tables when building asynchronously
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6ab9b372 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6ab9b372 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6ab9b372 Branch: refs/heads/4.x-HBase-1.4 Commit: 6ab9b372f16f37b11e657b6803c6a60007815824 Parents: cb17adb Author: Vincent Poon <vincentp...@apache.org> Authored: Fri May 18 11:22:26 2018 -0700 Committer: Vincent Poon <vincentp...@apache.org> Committed: Fri May 18 16:42:53 2018 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/IndexToolIT.java | 106 +++++++++++++- .../phoenix/mapreduce/index/IndexTool.java | 142 ++++++++++++++++++- 2 files changed, 242 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ab9b372/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index afb6d72..a120aaa 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -21,12 +21,15 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; @@ -34,8 +37,16 @@ import java.util.Properties; import java.util.UUID; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; -import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.PropertiesUtil; @@ -54,7 +65,7 @@ import com.google.common.collect.Maps; @RunWith(Parameterized.class) @Category(NeedsOwnMiniClusterTest.class) -public class IndexToolIT extends BaseTest { +public class IndexToolIT extends ParallelStatsEnabledIT { private final boolean localIndex; private final boolean transactional; @@ -85,7 +96,7 @@ public class IndexToolIT extends BaseTest { } @BeforeClass - public static void doSetup() throws Exception { + public static void setup() throws Exception { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2); serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); @@ -249,6 +260,86 @@ public class IndexToolIT extends BaseTest { } } + /** + * Test presplitting an index table + */ + @Test + public void testSplitIndex() throws Exception { + if (localIndex) return; // can't split local indexes + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + final TableName dataTN = TableName.valueOf(dataTableFullName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + TableName indexTN = TableName.valueOf(indexTableFullName); + try (Connection conn = + DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)); + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) { + String dataDDL = + "CREATE TABLE " + dataTableFullName + "(\n" + + "ID VARCHAR NOT NULL PRIMARY KEY,\n" + + "\"info\".CAR_NUM VARCHAR(18) NULL,\n" + + "\"test\".CAR_NUM VARCHAR(18) NULL,\n" + + "\"info\".CAP_DATE VARCHAR NULL,\n" + "\"info\".ORG_ID BIGINT NULL,\n" + + "\"info\".ORG_NAME VARCHAR(255) NULL\n" + ") COLUMN_ENCODED_BYTES = 0"; + conn.createStatement().execute(dataDDL); + + String[] carNumPrefixes = new String[] {"a", "b", "c", "d"}; + + // split the data table, as the tool splits the index table to have the same # of regions + // doesn't really matter what the split points are, we just want a target # of regions + int numSplits = carNumPrefixes.length; + int targetNumRegions = numSplits + 1; + byte[][] splitPoints = new byte[numSplits][]; + for (String prefix : carNumPrefixes) { + splitPoints[--numSplits] = Bytes.toBytes(prefix); + } + HTableDescriptor dataTD = admin.getTableDescriptor(dataTN); + admin.disableTable(dataTN); + admin.deleteTable(dataTN); + admin.createTable(dataTD, splitPoints); + assertEquals(targetNumRegions, admin.getTableRegions(dataTN).size()); + + // insert data where index column values start with a, b, c, d + int idCounter = 1; + try (PreparedStatement ps = conn.prepareStatement("UPSERT INTO " + dataTableFullName + + "(ID,\"info\".CAR_NUM,\"test\".CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME) VALUES(?,?,?,'2016-01-01 00:00:00',11,'orgname1')")){ + for (String carNum : carNumPrefixes) { + for (int i = 0; i < 100; i++) { + ps.setString(1, idCounter++ + ""); + ps.setString(2, carNum + "_" + i); + ps.setString(3, "test-" + carNum + "_ " + i); + ps.addBatch(); + } + } + ps.executeBatch(); + conn.commit(); + } + + String indexDDL = + String.format( + "CREATE INDEX %s on %s (\"info\".CAR_NUM,\"test\".CAR_NUM,\"info\".CAP_DATE) ASYNC", + indexTableName, dataTableFullName); + conn.createStatement().execute(indexDDL); + + // run with 50% sampling rate, split if data table more than 3 regions + runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, "-sp", "50", "-spa", "3"); + + assertEquals(targetNumRegions, admin.getTableRegions(indexTN).size()); + List<Cell> values = new ArrayList<>(); + // every index region should have been written to, if the index table was properly split uniformly + for (HRegion region : getUtility().getHBaseCluster().getRegions(indexTN)) { + values.clear(); + RegionScanner scanner = region.getScanner(new Scan()); + scanner.next(values); + if (values.isEmpty()) { + fail("Region did not have any results: " + region.getRegionInfo()); + } + } + } + } + public static void assertExplainPlan(boolean localIndex, String actualExplainPlan, String dataTableFullName, String indexTableFullName) { String expectedExplainPlan; @@ -297,13 +388,20 @@ public class IndexToolIT extends BaseTest { public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName, String dataTableName, String indexTableName) throws Exception { + runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, new String[0]); + } + + public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName, + String dataTableName, String indexTableName, String... additionalArgs) throws Exception { IndexTool indexingTool = new IndexTool(); Configuration conf = new Configuration(getUtility().getConfiguration()); conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); indexingTool.setConf(conf); final String[] cmdArgs = getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName); - int status = indexingTool.run(cmdArgs); + List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs)); + cmdArgList.addAll(Arrays.asList(additionalArgs)); + int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()])); assertEquals(0, status); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ab9b372/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index e3aa729..ac0be01 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -23,12 +23,12 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; +import java.io.IOException; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -66,10 +67,14 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.phoenix.compile.PostIndexDDLCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.mapreduce.CsvBulkImportUtil; +import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames; import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; @@ -81,6 +86,8 @@ import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.EquiDepthStreamHistogram; +import org.apache.phoenix.util.EquiDepthStreamHistogram.Bucket; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; @@ -111,6 +118,24 @@ public class IndexTool extends Configured implements Tool { private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false, "If specified, we avoid the bulk load (optional)"); + + private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0; + + private static final Option SPLIT_INDEX_OPTION = + new Option("sp", "split", true, + "Split the index table before building, to have the same # of regions as the data table. " + + "The data table is sampled to get uniform index splits across the index values. " + + "Takes an optional argument specifying the sampling rate," + + "otherwise defaults to " + DEFAULT_SPLIT_SAMPLING_RATE); + + private static final int DEFAULT_AUTOSPLIT_NUM_REGIONS = 20; + + private static final Option AUTO_SPLIT_INDEX_OPTION = + new Option("spa", "autosplit", true, + "Automatically split the index table if the # of data table regions is greater than N. " + + "Takes an optional argument specifying N, otherwise defaults to " + DEFAULT_AUTOSPLIT_NUM_REGIONS + + ". Can be used in conjunction with -split option to specify the sampling rate"); + private static final Option RUN_FOREGROUND_OPTION = new Option( "runfg", @@ -136,6 +161,10 @@ public class IndexTool extends Configured implements Tool { options.addOption(OUTPUT_PATH_OPTION); options.addOption(SNAPSHOT_OPTION); options.addOption(HELP_OPTION); + AUTO_SPLIT_INDEX_OPTION.setOptionalArg(true); + options.addOption(AUTO_SPLIT_INDEX_OPTION); + SPLIT_INDEX_OPTION.setOptionalArg(true); + options.addOption(SPLIT_INDEX_OPTION); return options; } @@ -181,6 +210,13 @@ public class IndexTool extends Configured implements Tool { throw new IllegalStateException(RUN_FOREGROUND_OPTION.getLongOpt() + " is applicable only for " + DIRECT_API_OPTION.getLongOpt()); } + boolean splitIndex = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt()) || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()); + if (splitIndex && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) { + throw new IllegalStateException("Must pass an index name for the split index option"); + } + if (splitIndex && cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())) { + throw new IllegalStateException("Cannot split index for a partial rebuild, as the index table is dropped"); + } return cmdLine; } @@ -510,6 +546,18 @@ public class IndexTool extends Configured implements Tool { isLocalIndexBuild = true; splitKeysBeforeJob = htable.getRegionLocator().getStartKeys(); } + // presplit the index table + boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt()); + boolean isSalted = pindexTable.getBucketNum() != null; // no need to split salted tables + if (!isSalted && IndexType.GLOBAL.equals(pindexTable.getIndexType()) + && (autosplit || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()))) { + String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt()); + int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt); + String rateOpt = cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt()); + double samplingRate = rateOpt == null ? DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt); + LOG.info(String.format("Will split index %s , autosplit=%s , autoSplitNumRegions=%s , samplingRate=%s", indexTable, autosplit, autosplitNumRegions, samplingRate)); + splitIndexTable(connection.unwrap(PhoenixConnection.class), qDataTable, pindexTable, autosplit, autosplitNumRegions, samplingRate); + } } PTable pdataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable); @@ -571,6 +619,96 @@ public class IndexTool extends Configured implements Tool { + private void splitIndexTable(PhoenixConnection pConnection, String qDataTable, + PTable pindexTable, boolean autosplit, int autosplitNumRegions, double samplingRate) + throws SQLException, IOException, IllegalArgumentException, InterruptedException { + final PTable pdataTable = PhoenixRuntime.getTable(pConnection, qDataTable); + int numRegions; + try (HTable hDataTable = + (HTable) pConnection.getQueryServices() + .getTable(pdataTable.getPhysicalName().getBytes())) { + numRegions = hDataTable.getRegionLocator().getStartKeys().length; + if (autosplit && !(numRegions > autosplitNumRegions)) { + LOG.info(String.format( + "Will not split index %s because the data table only has %s regions, autoSplitNumRegions=%s", + pindexTable.getPhysicalName(), numRegions, autosplitNumRegions)); + return; // do nothing if # of regions is too low + } + } + // build a tablesample query to fetch index column values from the data table + DataSourceColNames colNames = new DataSourceColNames(pdataTable, pindexTable); + String qTableSample = String.format(qDataTable + " TABLESAMPLE(%.2f)", samplingRate); + List<String> dataColNames = colNames.getDataColNames(); + final String dataSampleQuery = + QueryUtil.constructSelectStatement(qTableSample, dataColNames, null, + Hint.NO_INDEX, true); + IndexMaintainer maintainer = IndexMaintainer.create(pdataTable, pindexTable, pConnection); + ImmutableBytesWritable dataRowKeyPtr = new ImmutableBytesWritable(); + try (final PhoenixResultSet rs = + pConnection.createStatement().executeQuery(dataSampleQuery) + .unwrap(PhoenixResultSet.class); + HBaseAdmin admin = pConnection.getQueryServices().getAdmin()) { + EquiDepthStreamHistogram histo = new EquiDepthStreamHistogram(numRegions); + ValueGetter getter = getIndexValueGetter(rs, dataColNames); + // loop over data table rows - build the index rowkey, put it in the histogram + while (rs.next()) { + rs.getCurrentRow().getKey(dataRowKeyPtr); + // regionStart/EndKey only needed for local indexes, so we pass null + byte[] indexRowKey = maintainer.buildRowKey(getter, dataRowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP); + histo.addValue(indexRowKey); + } + List<Bucket> buckets = histo.computeBuckets(); + // do the split + // to get the splits, we just need the right bound of every histogram bucket, excluding the last + byte[][] splitPoints = new byte[buckets.size() - 1][]; + int splitIdx = 0; + for (Bucket b : buckets.subList(0, buckets.size() - 1)) { + splitPoints[splitIdx++] = b.getRightBoundExclusive(); + } + // drop table and recreate with appropriate splits + TableName indexTN = TableName.valueOf(pindexTable.getPhysicalName().getBytes()); + HTableDescriptor descriptor = admin.getTableDescriptor(indexTN); + admin.disableTable(indexTN); + admin.deleteTable(indexTN); + admin.createTable(descriptor, splitPoints); + } + } + + // setup a ValueGetter to get index values from the ResultSet + private ValueGetter getIndexValueGetter(final PhoenixResultSet rs, List<String> dataColNames) { + // map from data col name to index in ResultSet + final Map<String, Integer> rsIndex = new HashMap<>(dataColNames.size()); + int i = 1; + for (String dataCol : dataColNames) { + rsIndex.put(SchemaUtil.getEscapedFullColumnName(dataCol), i++); + } + ValueGetter getter = new ValueGetter() { + final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); + final ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable(); + + @Override + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { + try { + String fullColumnName = + SchemaUtil.getEscapedFullColumnName(SchemaUtil + .getColumnDisplayName(ref.getFamily(), ref.getQualifier())); + byte[] colVal = rs.getBytes(rsIndex.get(fullColumnName)); + valuePtr.set(colVal); + } catch (SQLException e) { + throw new IOException(e); + } + return valuePtr; + } + + @Override + public byte[] getRowKey() { + rs.getCurrentRow().getKey(rowKeyPtr); + return ByteUtil.copyKeyBytesIfNecessary(rowKeyPtr); + } + }; + return getter; + } + private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, HTable htable) throws Exception { if (splitKeysBeforeJob != null && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, htable.getRegionLocator().getStartKeys())) {