This is an automated email from the ASF dual-hosted git repository.

rajeshbabu pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new d880f32  PHOENIX-5794 Create a threshold for non async index creation, 
that can be modified in configs(Richard Antal)
d880f32 is described below

commit d880f32f1e111ff407fa3b383d1d0ea37dc19e5b
Author: Rajeshbabu <raj...@sahasram.com>
AuthorDate: Tue Apr 7 19:42:08 2020 +0530

    PHOENIX-5794 Create a threshold for non async index creation, that can be 
modified in configs(Richard Antal)
---
 .../end2end/index/IndexAsyncThresholdIT.java       | 207 +++++++++++++++++++++
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +
 .../org/apache/phoenix/query/QueryServices.java    |   1 +
 .../apache/phoenix/query/QueryServicesOptions.java |   3 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |  38 ++++
 pom.xml                                            |   4 +-
 6 files changed, 254 insertions(+), 2 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
new file mode 100644
index 0000000..6cd2eaf
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexAsyncThresholdIT.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PMetaData;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ServerUtil.ConnectionFactory;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.AfterParam;
+import org.junit.runners.Parameterized.BeforeParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+
+
+@RunWith(Parameterized.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexAsyncThresholdIT extends BaseTest {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(IndexAsyncThresholdIT.class);
+
+    private final String tableName;
+    private final long rows;
+    private final long columns;
+    private final boolean overThreshold;
+    private final Mode mode;
+
+    enum Mode{
+        NORMAL,
+        ASYNC,
+        COVERED,
+        FUNCTIONAL
+        }
+
+    public IndexAsyncThresholdIT(Long threshold, Long rows, Long columns, Long 
overThreshold,
+                                 Long mode)
+            throws Exception {
+        this.tableName = generateUniqueName();
+        this.rows = rows;
+        this.columns = columns;
+        this.overThreshold = overThreshold == 0;
+        this.mode = mode.equals(0L) ? Mode.NORMAL :
+                mode.equals(1L) ? Mode.ASYNC :
+                        mode.equals(2L) ? Mode.COVERED :
+                                Mode.FUNCTIONAL;
+    }
+
+    @Parameterized.Parameters
+    public static synchronized Collection<Long[]>  primeNumbers() {
+        return Arrays.asList(new Long[][]{
+                {100000L, 5000L, 10L, 0L, 0L},
+                {Long.MAX_VALUE, 200L, 100L, 1L, 0L},
+                {0L, 20L, 10L, 1L, 0L},
+                {1L, 20L, 10L, 1L, 1L},
+                {1L, 20L, 10L, 0L, 2L},
+                {1L, 100L, 10L, 0L, 3L},
+        });
+    }
+
+    @BeforeParam
+    public static synchronized void setupMiniCluster(Long threshold, Long 
rows, Long columns,
+                                                     Long overThreshold, Long 
mode)
+            throws Exception {
+        Configuration conf = HBaseConfiguration.create();
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD, 
Long.toString(threshold));
+        url = setUpTestCluster(conf, new 
ReadOnlyProps(props.entrySet().iterator()));
+        driver = initAndRegisterTestDriver(url,  new 
ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @AfterParam
+    public static synchronized void tearDownMiniCluster() throws Exception {
+        destroyDriver(driver);
+        try {
+            HBaseTestingUtility u = new HBaseTestingUtility();
+            u.shutdownMiniCluster();
+        } catch (Throwable t) {
+            logger.error("Exception caught when shutting down mini cluster", 
t);
+        } finally {
+            ConnectionFactory.shutdown();
+        }
+    }
+
+    @Test
+    public void testAsyncIndexCreation() throws Exception {
+        try (Connection connection = driver.connect(url, new Properties())) {
+            Statement stmt = connection.createStatement();
+            String indexName = "INDEX" + this.tableName;
+            createAndPopulateTable(connection, this.tableName, rows, columns);
+            connection.createStatement().execute("UPDATE STATISTICS " + 
this.tableName);
+            connection.commit();
+            ResultSet rs = stmt.executeQuery("select count(*) from " + 
this.tableName);
+            assertTrue(rs.next());
+            assertEquals(rows, rs.getInt(1));
+
+            SQLException exception = null;
+            try {
+                String statement = "create index " + indexName + " ON " + 
this.tableName;
+                if (this.mode == Mode.NORMAL || this.mode == Mode.ASYNC){
+                    statement += " (col2, col5, col6, col7, col8)";
+                    if(this.mode == Mode.ASYNC){
+                        statement += "  ASYNC";
+                    }
+                }
+                else if(this.mode == Mode.COVERED){
+                    statement += " (col2) INCLUDE(col5, col6, col7, col8)";
+                }
+                else {  // mode == Functional
+                    statement += " (UPPER(col2 || col4))";
+                }
+
+                stmt.execute(statement);
+            } catch (Exception e) {
+                assert e instanceof SQLException;
+                exception = (SQLException) e;
+            }
+            connection.commit();
+            PTableKey key = new PTableKey(null, this.tableName);
+            PMetaData metaCache = 
connection.unwrap(PhoenixConnection.class).getMetaDataCache();
+            List<PTable> indexes = 
metaCache.getTableRef(key).getTable().getIndexes();
+            if (!overThreshold) {
+                if(this.mode == Mode.ASYNC){
+                    assertEquals(PIndexState.BUILDING, 
indexes.get(0).getIndexState());
+                }
+                else {
+                    assertEquals(PIndexState.ACTIVE, 
indexes.get(0).getIndexState());
+                }
+                assertNull(exception);
+            } else {
+                assertEquals(0, indexes.size());
+                assertNotNull(exception);
+                assertEquals(exception.getErrorCode(),
+                        
SQLExceptionCode.ABOVE_INDEX_NON_ASYNC_THRESHOLD.getErrorCode());
+            }
+        }
+    }
+
+    private void createAndPopulateTable(Connection conn, String fullTableName, 
Long rows,
+                                        Long columns)
+            throws SQLException {
+        Statement stmt = conn.createStatement();
+        StringBuilder ddl = new StringBuilder("CREATE TABLE " + fullTableName
+                + " (col1 varchar primary key");
+        for (int i = 2; i< columns; i++){
+            ddl.append(", col").append(i).append(" varchar");
+        }
+        ddl.append(")");
+        stmt.execute(ddl.toString());
+        conn.commit();
+        for (int i = 0; i < rows; i++) {
+            StringBuilder dml = new StringBuilder("upsert into " + 
fullTableName + " values (");
+            for (int j = 1; j < columns; j++) {
+                
dml.append("'col").append(j).append("VAL").append(i).append("'");
+                if(j < columns -1){
+                    dml.append(", ");
+                }
+            }
+            dml.append(")");
+            stmt.execute(dml.toString());
+        }
+        conn.commit();
+    }
+}
\ No newline at end of file
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 14fb249..7c211df 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -331,6 +331,9 @@ public enum SQLExceptionCode {
             + " property can only be set for views"),
     CANNOT_SET_OR_ALTER_VIEW_TTL_FOR_TABLE_WITH_TTL(10952, "44A33", "Cannot 
set or alter "
             + PhoenixDatabaseMetaData.VIEW_TTL + " property on an table with 
TTL,"),
+    ABOVE_INDEX_NON_ASYNC_THRESHOLD(1097, "44A34", "The estimated read size 
for index creation "
+            + "is higher than " + QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD+ 
". You can edit the"
+            + " limit or create ASYNC index."),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new 
Factory() {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 18c05ce..34af089 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -66,6 +66,7 @@ public interface QueryServices extends SQLCloseable {
     // joni byte regex engine setting
     public static final String USE_BYTE_BASED_REGEX_ATTRIB = 
"phoenix.regex.byteBased";
     public static final String DRIVER_SHUTDOWN_TIMEOUT_MS = 
"phoenix.shutdown.timeoutMs";
+    public static final String CLIENT_INDEX_ASYNC_THRESHOLD = 
"phoenix.index.async.threshold";
 
     /**
         * max size to spool the the result into
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index f474c16..4e62e25 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -101,6 +101,7 @@ import static 
org.apache.phoenix.query.QueryServices.UPLOAD_BINARY_DATA_TYPE_ENC
 import static 
org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.USE_STATS_FOR_PARALLELIZATION;
+import static 
org.apache.phoenix.query.QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD;
 
 import java.util.HashSet;
 import java.util.Map.Entry;
@@ -161,6 +162,7 @@ public class QueryServicesOptions {
     public static final int DEFAULT_TRACING_BATCH_SIZE = 100;
     public static final int DEFAULT_TRACING_TRACE_BUFFER_SIZE = 1000;
     public static final int DEFAULT_MAX_INDEXES_PER_TABLE = 10;
+    public static final int DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD = 0;
 
     public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for 
UPSERT SELECT and DELETE
     //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 2MB
@@ -485,6 +487,7 @@ public class QueryServicesOptions {
             .setIfUnset(LOG_SAMPLE_RATE,  DEFAULT_LOG_SAMPLE_RATE)
             .setIfUnset(TxConstants.TX_PRE_014_CHANGESET_KEY, 
Boolean.FALSE.toString())
             .setIfUnset(CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG)
+            .setIfUnset(CLIENT_INDEX_ASYNC_THRESHOLD, 
DEFAULT_CLIENT_INDEX_ASYNC_THRESHOLD)
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user 
set
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ffc7970..e560a63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -135,7 +135,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.HashSet;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import com.google.gson.JsonObject;
 import org.apache.hadoop.hbase.HConstants;
@@ -167,6 +169,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.SharedTableState;
+import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.util.ViewUtil;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -1687,6 +1690,41 @@ public class MetaDataClient {
                 }
             }
 
+            Configuration config = 
connection.getQueryServices().getConfiguration();
+            long threshold = 
Long.parseLong(config.get(QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD));
+
+            if (threshold > 0 && !statement.isAsync()) {
+                Set<String> columnFamilies = new HashSet<>();
+                for (ColumnDef column : columnDefs){
+                    try {
+                        String columnFamily = IndexUtil
+                                
.getDataColumnFamilyName(column.getColumnDefName().getColumnName());
+                        columnFamilies.add(!columnFamily.equals("") ? 
columnFamily
+                                : dataTable.getDefaultFamilyName()!= null ?
+                                        
dataTable.getDefaultFamilyName().toString()
+                                        : 
QueryConstants.DEFAULT_COLUMN_FAMILY);
+                    } catch (Exception ignored){
+                        ; // We ignore any exception during this phase
+                    }
+                }
+                long estimatedBytes = 0;
+                for (String colFamily : columnFamilies) {
+                    GuidePostsInfo gps = 
connection.getQueryServices().getTableStats(
+                            new 
GuidePostsKey(Bytes.toBytes(tableRef.getTable().toString()),
+                                    Bytes.toBytes(colFamily)));
+                    long[] byteCounts = gps.getByteCounts();
+                    for (long byteCount : byteCounts) {
+                        estimatedBytes += byteCount;
+                    }
+
+                    if (threshold < estimatedBytes) {
+                        throw new SQLExceptionInfo
+                                
.Builder(SQLExceptionCode.ABOVE_INDEX_NON_ASYNC_THRESHOLD)
+                                .build().buildException();
+                    }
+                }
+            }
+
             // Set DEFAULT_COLUMN_FAMILY_NAME of index to match data table
             // We need this in the props so that the correct column family is 
created
             if (dataTable.getDefaultFamilyName() != null && 
dataTable.getType() != PTableType.VIEW && !allocateIndexId) {
diff --git a/pom.xml b/pom.xml
index 1a12109..896d945 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,7 +130,7 @@
     <servlet.api.version>3.1.0</servlet.api.version>
     <!-- Test Dependencies -->
     <mockito-all.version>1.8.5</mockito-all.version>
-    <junit.version>4.12</junit.version>
+    <junit.version>4.13</junit.version>
 
     <!-- Plugin versions -->
     <maven-eclipse-plugin.version>2.9</maven-eclipse-plugin.version>
@@ -1203,7 +1203,7 @@
     </profile>
   </profiles>
 
-  <reporting>
+ <reporting>
       <plugins>
           <plugin>
               <groupId>org.apache.maven.plugins</groupId>

Reply via email to