PHOENIX-4704 Presplit index tables when building asynchronously

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6ab9b372
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6ab9b372
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6ab9b372

Branch: refs/heads/4.x-HBase-1.4
Commit: 6ab9b372f16f37b11e657b6803c6a60007815824
Parents: cb17adb
Author: Vincent Poon <vincentp...@apache.org>
Authored: Fri May 18 11:22:26 2018 -0700
Committer: Vincent Poon <vincentp...@apache.org>
Committed: Fri May 18 16:42:53 2018 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/IndexToolIT.java | 106 +++++++++++++-
 .../phoenix/mapreduce/index/IndexTool.java      | 142 ++++++++++++++++++-
 2 files changed, 242 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ab9b372/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index afb6d72..a120aaa 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -21,12 +21,15 @@ import static 
org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -34,8 +37,16 @@ import java.util.Properties;
 import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -54,7 +65,7 @@ import com.google.common.collect.Maps;
 
 @RunWith(Parameterized.class)
 @Category(NeedsOwnMiniClusterTest.class)
-public class IndexToolIT extends BaseTest {
+public class IndexToolIT extends ParallelStatsEnabledIT {
 
     private final boolean localIndex;
     private final boolean transactional;
@@ -85,7 +96,7 @@ public class IndexToolIT extends BaseTest {
     }
 
     @BeforeClass
-    public static void doSetup() throws Exception {
+    public static void setup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
         serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
             QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
@@ -249,6 +260,86 @@ public class IndexToolIT extends BaseTest {
         }
     }
 
+    /**
+     * Test presplitting an index table
+     */
+    @Test
+    public void testSplitIndex() throws Exception {
+        if (localIndex) return; // can't split local indexes
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        final TableName dataTN = TableName.valueOf(dataTableFullName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+        TableName indexTN = TableName.valueOf(indexTableFullName);
+        try (Connection conn =
+                DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES));
+                HBaseAdmin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+            String dataDDL =
+                    "CREATE TABLE " + dataTableFullName + "(\n"
+                            + "ID VARCHAR NOT NULL PRIMARY KEY,\n"
+                            + "\"info\".CAR_NUM VARCHAR(18) NULL,\n"
+                            + "\"test\".CAR_NUM VARCHAR(18) NULL,\n"
+                            + "\"info\".CAP_DATE VARCHAR NULL,\n" + 
"\"info\".ORG_ID BIGINT NULL,\n"
+                            + "\"info\".ORG_NAME VARCHAR(255) NULL\n" + ") 
COLUMN_ENCODED_BYTES = 0";
+            conn.createStatement().execute(dataDDL);
+
+            String[] carNumPrefixes = new String[] {"a", "b", "c", "d"};
+
+            // split the data table, as the tool splits the index table to 
have the same # of regions
+            // doesn't really matter what the split points are, we just want a 
target # of regions
+            int numSplits = carNumPrefixes.length;
+            int targetNumRegions = numSplits + 1;
+            byte[][] splitPoints = new byte[numSplits][];
+            for (String prefix : carNumPrefixes) {
+                splitPoints[--numSplits] = Bytes.toBytes(prefix);
+            }
+            HTableDescriptor dataTD = admin.getTableDescriptor(dataTN);
+            admin.disableTable(dataTN);
+            admin.deleteTable(dataTN);
+            admin.createTable(dataTD, splitPoints);
+            assertEquals(targetNumRegions, 
admin.getTableRegions(dataTN).size());
+
+            // insert data where index column values start with a, b, c, d
+            int idCounter = 1;
+            try (PreparedStatement ps = conn.prepareStatement("UPSERT INTO " + 
dataTableFullName
+                + 
"(ID,\"info\".CAR_NUM,\"test\".CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME) 
VALUES(?,?,?,'2016-01-01 00:00:00',11,'orgname1')")){
+                for (String carNum : carNumPrefixes) {
+                    for (int i = 0; i < 100; i++) {
+                        ps.setString(1, idCounter++ + "");
+                        ps.setString(2, carNum + "_" + i);
+                        ps.setString(3, "test-" + carNum + "_ " + i);
+                        ps.addBatch();
+                    }
+                }
+                ps.executeBatch();
+                conn.commit();
+            }
+
+            String indexDDL =
+                    String.format(
+                        "CREATE INDEX %s on %s 
(\"info\".CAR_NUM,\"test\".CAR_NUM,\"info\".CAP_DATE) ASYNC",
+                        indexTableName, dataTableFullName);
+            conn.createStatement().execute(indexDDL);
+
+            // run with 50% sampling rate, split if data table more than 3 
regions
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, 
indexTableName, "-sp", "50", "-spa", "3");
+
+            assertEquals(targetNumRegions, 
admin.getTableRegions(indexTN).size());
+            List<Cell> values = new ArrayList<>();
+            // every index region should have been written to, if the index 
table was properly split uniformly
+            for (HRegion region : 
getUtility().getHBaseCluster().getRegions(indexTN)) {
+                values.clear();
+                RegionScanner scanner = region.getScanner(new Scan());
+                scanner.next(values);
+                if (values.isEmpty()) {
+                    fail("Region did not have any results: " + 
region.getRegionInfo());
+                }
+            }
+        }
+    }
+
     public static void assertExplainPlan(boolean localIndex, String 
actualExplainPlan,
             String dataTableFullName, String indexTableFullName) {
         String expectedExplainPlan;
@@ -297,13 +388,20 @@ public class IndexToolIT extends BaseTest {
 
     public static void runIndexTool(boolean directApi, boolean useSnapshot, 
String schemaName,
             String dataTableName, String indexTableName) throws Exception {
+        runIndexTool(directApi, useSnapshot, schemaName, dataTableName, 
indexTableName, new String[0]);
+    }
+
+    public static void runIndexTool(boolean directApi, boolean useSnapshot, 
String schemaName,
+            String dataTableName, String indexTableName, String... 
additionalArgs) throws Exception {
         IndexTool indexingTool = new IndexTool();
         Configuration conf = new 
Configuration(getUtility().getConfiguration());
         conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
         indexingTool.setConf(conf);
         final String[] cmdArgs =
                 getArgValues(directApi, useSnapshot, schemaName, 
dataTableName, indexTableName);
-        int status = indexingTool.run(cmdArgs);
+        List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
+        cmdArgList.addAll(Arrays.asList(additionalArgs));
+        int status = indexingTool.run(cmdArgList.toArray(new 
String[cmdArgList.size()]));
         assertEquals(0, status);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ab9b372/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index e3aa729..ac0be01 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -23,12 +23,12 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -66,10 +67,14 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import 
org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames;
 import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -81,6 +86,8 @@ import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EquiDepthStreamHistogram;
+import org.apache.phoenix.util.EquiDepthStreamHistogram.Bucket;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -111,6 +118,24 @@ public class IndexTool extends Configured implements Tool {
     
     private static final Option DIRECT_API_OPTION = new Option("direct", 
"direct", false,
             "If specified, we avoid the bulk load (optional)");
+
+    private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
+
+    private static final Option SPLIT_INDEX_OPTION =
+            new Option("sp", "split", true,
+                    "Split the index table before building, to have the same # 
of regions as the data table.  "
+                    + "The data table is sampled to get uniform index splits 
across the index values.  "
+                    + "Takes an optional argument specifying the sampling 
rate,"
+                    + "otherwise defaults to " + DEFAULT_SPLIT_SAMPLING_RATE);
+
+    private static final int DEFAULT_AUTOSPLIT_NUM_REGIONS = 20;
+
+    private static final Option AUTO_SPLIT_INDEX_OPTION =
+            new Option("spa", "autosplit", true,
+                    "Automatically split the index table if the # of data 
table regions is greater than N. "
+                    + "Takes an optional argument specifying N, otherwise 
defaults to " + DEFAULT_AUTOSPLIT_NUM_REGIONS
+                    + ".  Can be used in conjunction with -split option to 
specify the sampling rate");
+
     private static final Option RUN_FOREGROUND_OPTION =
             new Option(
                     "runfg",
@@ -136,6 +161,10 @@ public class IndexTool extends Configured implements Tool {
         options.addOption(OUTPUT_PATH_OPTION);
         options.addOption(SNAPSHOT_OPTION);
         options.addOption(HELP_OPTION);
+        AUTO_SPLIT_INDEX_OPTION.setOptionalArg(true);
+        options.addOption(AUTO_SPLIT_INDEX_OPTION);
+        SPLIT_INDEX_OPTION.setOptionalArg(true);
+        options.addOption(SPLIT_INDEX_OPTION);
         return options;
     }
 
@@ -181,6 +210,13 @@ public class IndexTool extends Configured implements Tool {
             throw new IllegalStateException(RUN_FOREGROUND_OPTION.getLongOpt()
                     + " is applicable only for " + 
DIRECT_API_OPTION.getLongOpt());
         }
+        boolean splitIndex = 
cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt()) || 
cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt());
+        if (splitIndex && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+            throw new IllegalStateException("Must pass an index name for the 
split index option");
+        }
+        if (splitIndex && cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())) {
+            throw new IllegalStateException("Cannot split index for a partial 
rebuild, as the index table is dropped");
+        }
         return cmdLine;
     }
 
@@ -510,6 +546,18 @@ public class IndexTool extends Configured implements Tool {
                     isLocalIndexBuild = true;
                     splitKeysBeforeJob = 
htable.getRegionLocator().getStartKeys();
                 }
+                // presplit the index table
+                boolean autosplit = 
cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt());
+                boolean isSalted = pindexTable.getBucketNum() != null; // no 
need to split salted tables
+                if (!isSalted && 
IndexType.GLOBAL.equals(pindexTable.getIndexType())
+                        && (autosplit || 
cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()))) {
+                    String nOpt = 
cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt());
+                    int autosplitNumRegions = nOpt == null ? 
DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
+                    String rateOpt = 
cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt());
+                    double samplingRate = rateOpt == null ? 
DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt);
+                    LOG.info(String.format("Will split index %s , autosplit=%s 
, autoSplitNumRegions=%s , samplingRate=%s", indexTable, autosplit, 
autosplitNumRegions, samplingRate));
+                    
splitIndexTable(connection.unwrap(PhoenixConnection.class), qDataTable, 
pindexTable, autosplit, autosplitNumRegions, samplingRate);
+                }
             }
             
             PTable pdataTable = PhoenixRuntime.getTableNoCache(connection, 
qDataTable);
@@ -571,6 +619,96 @@ public class IndexTool extends Configured implements Tool {
 
     
 
+    private void splitIndexTable(PhoenixConnection pConnection, String 
qDataTable,
+            PTable pindexTable, boolean autosplit, int autosplitNumRegions, 
double samplingRate)
+            throws SQLException, IOException, IllegalArgumentException, 
InterruptedException {
+        final PTable pdataTable = PhoenixRuntime.getTable(pConnection, 
qDataTable);
+        int numRegions;
+        try (HTable hDataTable =
+                (HTable) pConnection.getQueryServices()
+                        .getTable(pdataTable.getPhysicalName().getBytes())) {
+            numRegions = hDataTable.getRegionLocator().getStartKeys().length;
+            if (autosplit && !(numRegions > autosplitNumRegions)) {
+                LOG.info(String.format(
+                    "Will not split index %s because the data table only has 
%s regions, autoSplitNumRegions=%s",
+                    pindexTable.getPhysicalName(), numRegions, 
autosplitNumRegions));
+                return; // do nothing if # of regions is too low
+            }
+        }
+        // build a tablesample query to fetch index column values from the 
data table
+        DataSourceColNames colNames = new DataSourceColNames(pdataTable, 
pindexTable);
+        String qTableSample = String.format(qDataTable + " TABLESAMPLE(%.2f)", 
samplingRate);
+        List<String> dataColNames = colNames.getDataColNames();
+        final String dataSampleQuery =
+                QueryUtil.constructSelectStatement(qTableSample, dataColNames, 
null,
+                    Hint.NO_INDEX, true);
+        IndexMaintainer maintainer = IndexMaintainer.create(pdataTable, 
pindexTable, pConnection);
+        ImmutableBytesWritable dataRowKeyPtr = new ImmutableBytesWritable();
+        try (final PhoenixResultSet rs =
+                pConnection.createStatement().executeQuery(dataSampleQuery)
+                        .unwrap(PhoenixResultSet.class);
+                HBaseAdmin admin = pConnection.getQueryServices().getAdmin()) {
+            EquiDepthStreamHistogram histo = new 
EquiDepthStreamHistogram(numRegions);
+            ValueGetter getter = getIndexValueGetter(rs, dataColNames);
+            // loop over data table rows - build the index rowkey, put it in 
the histogram
+            while (rs.next()) {
+                rs.getCurrentRow().getKey(dataRowKeyPtr);
+                // regionStart/EndKey only needed for local indexes, so we 
pass null
+                byte[] indexRowKey = maintainer.buildRowKey(getter, 
dataRowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP);
+                histo.addValue(indexRowKey);
+            }
+            List<Bucket> buckets = histo.computeBuckets();
+            // do the split
+            // to get the splits, we just need the right bound of every 
histogram bucket, excluding the last
+            byte[][] splitPoints = new byte[buckets.size() - 1][];
+            int splitIdx = 0;
+            for (Bucket b : buckets.subList(0, buckets.size() - 1)) {
+                splitPoints[splitIdx++] = b.getRightBoundExclusive();
+            }
+            // drop table and recreate with appropriate splits
+            TableName indexTN = 
TableName.valueOf(pindexTable.getPhysicalName().getBytes());
+            HTableDescriptor descriptor = admin.getTableDescriptor(indexTN);
+            admin.disableTable(indexTN);
+            admin.deleteTable(indexTN);
+            admin.createTable(descriptor, splitPoints);
+        }
+    }
+
+    // setup a ValueGetter to get index values from the ResultSet
+    private ValueGetter getIndexValueGetter(final PhoenixResultSet rs, 
List<String> dataColNames) {
+        // map from data col name to index in ResultSet
+        final Map<String, Integer> rsIndex = new 
HashMap<>(dataColNames.size());
+        int i = 1;
+        for (String dataCol : dataColNames) {
+            rsIndex.put(SchemaUtil.getEscapedFullColumnName(dataCol), i++);
+        }
+        ValueGetter getter = new ValueGetter() {
+            final ImmutableBytesWritable valuePtr = new 
ImmutableBytesWritable();
+            final ImmutableBytesWritable rowKeyPtr = new 
ImmutableBytesWritable();
+
+            @Override
+            public ImmutableBytesWritable getLatestValue(ColumnReference ref, 
long ts) throws IOException {
+                try {
+                    String fullColumnName =
+                            SchemaUtil.getEscapedFullColumnName(SchemaUtil
+                                    .getColumnDisplayName(ref.getFamily(), 
ref.getQualifier()));
+                    byte[] colVal = rs.getBytes(rsIndex.get(fullColumnName));
+                    valuePtr.set(colVal);
+                } catch (SQLException e) {
+                    throw new IOException(e);
+                }
+                return valuePtr;
+            }
+
+            @Override
+            public byte[] getRowKey() {
+                rs.getCurrentRow().getKey(rowKeyPtr);
+                return ByteUtil.copyKeyBytesIfNecessary(rowKeyPtr);
+            }
+        };
+        return getter;
+    }
+
     private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, 
HTable htable) throws Exception {
         if (splitKeysBeforeJob != null
                 && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, 
htable.getRegionLocator().getStartKeys())) {

Reply via email to