This is an automated email from the ASF dual-hosted git repository. rajeshbabu pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new d880f32 PHOENIX-5794 Create a threshold for non async index creation, that can be modified in configs(Richard Antal) d880f32 is described below commit d880f32f1e111ff407fa3b383d1d0ea37dc19e5b Author: Rajeshbabu <raj...@sahasram.com> AuthorDate: Tue Apr 7 19:42:08 2020 +0530 PHOENIX-5794 Create a threshold for non async index creation, that can be modified in configs(Richard Antal) --- .../end2end/index/IndexAsyncThresholdIT.java | 207 +++++++++++++++++++++ .../apache/phoenix/exception/SQLExceptionCode.java | 3 + .../org/apache/phoenix/query/QueryServices.java | 1 + .../apache/phoenix/query/QueryServicesOptions.java | 3 + .../org/apache/phoenix/schema/MetaDataClient.java | 38 ++++ pom.xml | 4 +- 6 files changed, 254 insertions(+), 2 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java new file mode 100644 index 0000000..6cd2eaf --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PMetaData; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.ServerUtil.ConnectionFactory; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.AfterParam; +import org.junit.runners.Parameterized.BeforeParam; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; + + +@RunWith(Parameterized.class) +@Category(NeedsOwnMiniClusterTest.class) +public class IndexAsyncThresholdIT extends BaseTest { + + private static final Logger logger = LoggerFactory.getLogger(IndexAsyncThresholdIT.class); + + private final String tableName; + private final long rows; + private final long columns; + private final boolean overThreshold; + private final Mode mode; + + enum Mode{ + NORMAL, + ASYNC, + COVERED, + FUNCTIONAL + } + + public IndexAsyncThresholdIT(Long threshold, Long rows, Long columns, Long overThreshold, + Long mode) + throws Exception { + this.tableName = generateUniqueName(); + this.rows = rows; + this.columns = columns; + this.overThreshold = overThreshold == 0; + this.mode = mode.equals(0L) ? Mode.NORMAL : + mode.equals(1L) ? Mode.ASYNC : + mode.equals(2L) ? Mode.COVERED : + Mode.FUNCTIONAL; + } + + @Parameterized.Parameters + public static synchronized Collection<Long[]> primeNumbers() { + return Arrays.asList(new Long[][]{ + {100000L, 5000L, 10L, 0L, 0L}, + {Long.MAX_VALUE, 200L, 100L, 1L, 0L}, + {0L, 20L, 10L, 1L, 0L}, + {1L, 20L, 10L, 1L, 1L}, + {1L, 20L, 10L, 0L, 2L}, + {1L, 100L, 10L, 0L, 3L}, + }); + } + + @BeforeParam + public static synchronized void setupMiniCluster(Long threshold, Long rows, Long columns, + Long overThreshold, Long mode) + throws Exception { + Configuration conf = HBaseConfiguration.create(); + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD, Long.toString(threshold)); + url = setUpTestCluster(conf, new ReadOnlyProps(props.entrySet().iterator())); + driver = initAndRegisterTestDriver(url, new ReadOnlyProps(props.entrySet().iterator())); + } + + @AfterParam + public static synchronized void tearDownMiniCluster() throws Exception { + destroyDriver(driver); + try { + HBaseTestingUtility u = new HBaseTestingUtility(); + u.shutdownMiniCluster(); + } catch (Throwable t) { + logger.error("Exception caught when shutting down mini cluster", t); + } finally { + ConnectionFactory.shutdown(); + } + } + + @Test + public void testAsyncIndexCreation() throws Exception { + try (Connection connection = driver.connect(url, new Properties())) { + Statement stmt = connection.createStatement(); + String indexName = "INDEX" + this.tableName; + createAndPopulateTable(connection, this.tableName, rows, columns); + connection.createStatement().execute("UPDATE STATISTICS " + this.tableName); + connection.commit(); + ResultSet rs = stmt.executeQuery("select count(*) from " + this.tableName); + assertTrue(rs.next()); + assertEquals(rows, rs.getInt(1)); + + SQLException exception = null; + try { + String statement = "create index " + indexName + " ON " + this.tableName; + if (this.mode == Mode.NORMAL || this.mode == Mode.ASYNC){ + statement += " (col2, col5, col6, col7, col8)"; + if(this.mode == Mode.ASYNC){ + statement += " ASYNC"; + } + } + else if(this.mode == Mode.COVERED){ + statement += " (col2) INCLUDE(col5, col6, col7, col8)"; + } + else { // mode == Functional + statement += " (UPPER(col2 || col4))"; + } + + stmt.execute(statement); + } catch (Exception e) { + assert e instanceof SQLException; + exception = (SQLException) e; + } + connection.commit(); + PTableKey key = new PTableKey(null, this.tableName); + PMetaData metaCache = connection.unwrap(PhoenixConnection.class).getMetaDataCache(); + List<PTable> indexes = metaCache.getTableRef(key).getTable().getIndexes(); + if (!overThreshold) { + if(this.mode == Mode.ASYNC){ + assertEquals(PIndexState.BUILDING, indexes.get(0).getIndexState()); + } + else { + assertEquals(PIndexState.ACTIVE, indexes.get(0).getIndexState()); + } + assertNull(exception); + } else { + assertEquals(0, indexes.size()); + assertNotNull(exception); + assertEquals(exception.getErrorCode(), + SQLExceptionCode.ABOVE_INDEX_NON_ASYNC_THRESHOLD.getErrorCode()); + } + } + } + + private void createAndPopulateTable(Connection conn, String fullTableName, Long rows, + Long columns) + throws SQLException { + Statement stmt = conn.createStatement(); + StringBuilder ddl = new StringBuilder("CREATE TABLE " + fullTableName + + " (col1 varchar primary key"); + for (int i = 2; i< columns; i++){ + ddl.append(", col").append(i).append(" varchar"); + } + ddl.append(")"); + stmt.execute(ddl.toString()); + conn.commit(); + for (int i = 0; i < rows; i++) { + StringBuilder dml = new StringBuilder("upsert into " + fullTableName + " values ("); + for (int j = 1; j < columns; j++) { + dml.append("'col").append(j).append("VAL").append(i).append("'"); + if(j < columns -1){ + dml.append(", "); + } + } + dml.append(")"); + stmt.execute(dml.toString()); + } + conn.commit(); + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 14fb249..7c211df 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -331,6 +331,9 @@ public enum SQLExceptionCode { + " property can only be set for views"), CANNOT_SET_OR_ALTER_VIEW_TTL_FOR_TABLE_WITH_TTL(10952, "44A33", "Cannot set or alter " + PhoenixDatabaseMetaData.VIEW_TTL + " property on an table with TTL,"), + ABOVE_INDEX_NON_ASYNC_THRESHOLD(1097, "44A34", "The estimated read size for index creation " + + "is higher than " + QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD+ ". You can edit the" + + " limit or create ASYNC index."), /** Sequence related */ SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 18c05ce..34af089 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -66,6 +66,7 @@ public interface QueryServices extends SQLCloseable { // joni byte regex engine setting public static final String USE_BYTE_BASED_REGEX_ATTRIB = "phoenix.regex.byteBased"; public static final String DRIVER_SHUTDOWN_TIMEOUT_MS = "phoenix.shutdown.timeoutMs"; + public static final String CLIENT_INDEX_ASYNC_THRESHOLD = "phoenix.index.async.threshold"; /** * max size to spool the the result into diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index f474c16..4e62e25 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -101,6 +101,7 @@ import static org.apache.phoenix.query.QueryServices.UPLOAD_BINARY_DATA_TYPE_ENC import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB; import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB; import static org.apache.phoenix.query.QueryServices.USE_STATS_FOR_PARALLELIZATION; +import static org.apache.phoenix.query.QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD; import java.util.HashSet; import java.util.Map.Entry; @@ -161,6 +162,7 @@ public class QueryServicesOptions { public static final int DEFAULT_TRACING_BATCH_SIZE = 100; public static final int DEFAULT_TRACING_TRACE_BUFFER_SIZE = 1000; public static final int DEFAULT_MAX_INDEXES_PER_TABLE = 10; + public static final int DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD = 0; public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for UPSERT SELECT and DELETE //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 2MB @@ -485,6 +487,7 @@ public class QueryServicesOptions { .setIfUnset(LOG_SAMPLE_RATE, DEFAULT_LOG_SAMPLE_RATE) .setIfUnset(TxConstants.TX_PRE_014_CHANGESET_KEY, Boolean.FALSE.toString()) .setIfUnset(CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG) + .setIfUnset(CLIENT_INDEX_ASYNC_THRESHOLD, DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD) ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index ffc7970..e560a63 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -135,7 +135,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.HashSet; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import com.google.gson.JsonObject; import org.apache.hadoop.hbase.HConstants; @@ -167,6 +169,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.coprocessor.MetaDataProtocol.SharedTableState; +import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.util.ViewUtil; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -1687,6 +1690,41 @@ public class MetaDataClient { } } + Configuration config = connection.getQueryServices().getConfiguration(); + long threshold = Long.parseLong(config.get(QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD)); + + if (threshold > 0 && !statement.isAsync()) { + Set<String> columnFamilies = new HashSet<>(); + for (ColumnDef column : columnDefs){ + try { + String columnFamily = IndexUtil + .getDataColumnFamilyName(column.getColumnDefName().getColumnName()); + columnFamilies.add(!columnFamily.equals("") ? columnFamily + : dataTable.getDefaultFamilyName()!= null ? + dataTable.getDefaultFamilyName().toString() + : QueryConstants.DEFAULT_COLUMN_FAMILY); + } catch (Exception ignored){ + ; // We ignore any exception during this phase + } + } + long estimatedBytes = 0; + for (String colFamily : columnFamilies) { + GuidePostsInfo gps = connection.getQueryServices().getTableStats( + new GuidePostsKey(Bytes.toBytes(tableRef.getTable().toString()), + Bytes.toBytes(colFamily))); + long[] byteCounts = gps.getByteCounts(); + for (long byteCount : byteCounts) { + estimatedBytes += byteCount; + } + + if (threshold < estimatedBytes) { + throw new SQLExceptionInfo + .Builder(SQLExceptionCode.ABOVE_INDEX_NON_ASYNC_THRESHOLD) + .build().buildException(); + } + } + } + // Set DEFAULT_COLUMN_FAMILY_NAME of index to match data table // We need this in the props so that the correct column family is created if (dataTable.getDefaultFamilyName() != null && dataTable.getType() != PTableType.VIEW && !allocateIndexId) { diff --git a/pom.xml b/pom.xml index 1a12109..896d945 100644 --- a/pom.xml +++ b/pom.xml @@ -130,7 +130,7 @@ <servlet.api.version>3.1.0</servlet.api.version> <!-- Test Dependencies --> <mockito-all.version>1.8.5</mockito-all.version> - <junit.version>4.12</junit.version> + <junit.version>4.13</junit.version> <!-- Plugin versions --> <maven-eclipse-plugin.version>2.9</maven-eclipse-plugin.version> @@ -1203,7 +1203,7 @@ </profile> </profiles> - <reporting> + <reporting> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId>