KYLIN-2803 Pushdown non select query

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dbecb9b8
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dbecb9b8
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dbecb9b8

Branch: refs/heads/master
Commit: dbecb9b84bc90049acb3ef314128ac71f3516731
Parents: 480592d
Author: shi shaofeng <shishaofeng@shis-MacBook-Pro.local>
Authored: Fri Aug 25 16:18:21 2017 +0800
Committer: SHAOFENG SHI <shaofeng...@gmail.com>
Committed: Mon Aug 28 16:02:43 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 ++
 .../adhocquery/HivePushDownConverter.java       |  4 ++
 .../source/adhocquery/IPushDownRunner.java      | 13 ++++++
 .../query/adhoc/PushDownRunnerJdbcImpl.java     | 36 ++++++++++++-----
 .../apache/kylin/query/util/PushDownUtil.java   | 32 ++++++++++-----
 .../org/apache/kylin/query/util/QueryUtil.java  | 28 +++++++++++++
 .../apache/kylin/query/util/QueryUtilTest.java  | 39 ++++++++++++++++++
 .../apache/kylin/rest/service/QueryService.java | 42 +++++++++++++++++---
 8 files changed, 172 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index bca00e7..a113327 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1088,6 +1088,10 @@ abstract public class KylinConfigBase implements 
Serializable {
                 new String[] { 
"org.apache.kylin.source.adhocquery.HivePushDownConverter" });
     }
 
+    public boolean isPushdownQueryCacheEnabled() {
+        return 
Boolean.parseBoolean(this.getOptional("kylin.query.pushdown.cache-enabled", 
"false"));
+    }
+
     public String getJdbcUrl() {
         return getOptional("kylin.query.pushdown.jdbc.url", "");
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/HivePushDownConverter.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/HivePushDownConverter.java
 
b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/HivePushDownConverter.java
index bcd8608..eef4594 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/HivePushDownConverter.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/HivePushDownConverter.java
@@ -262,6 +262,10 @@ public class HivePushDownConverter implements 
IPushDownConverter {
         // Step7.Add quote for interval in timestampadd
         convertedSql = timestampaddReplace(convertedSql);
 
+        // Step8.Replace integer with int
+        convertedSql = replaceString(convertedSql, "INTEGER", "INT");
+        convertedSql = replaceString(convertedSql, "integer", "int");
+
         return convertedSql;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java
 
b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java
index c8d18aa..0336bfb 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java
@@ -36,4 +36,17 @@ public interface IPushDownRunner {
      * @throws Exception if running pushdown query fails
      */
     void executeQuery(String query, List<List<String>> returnRows, 
List<SelectedColumnMeta> returnColumnMeta) throws Exception;
+
+
+    /**
+     * Run an pushdown non-query sql
+     *
+     * @param sql                 the sql statement
+     *
+     * @return whether the SQL is executed successfully
+     *
+     * @throws Exception if running pushdown fails
+     */
+    boolean executeUpdate(String sql) throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
----------------------------------------------------------------------
diff --git 
a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java 
b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
index 751e6b0..713629a 100644
--- 
a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
+++ 
b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java
@@ -29,6 +29,7 @@ import java.util.List;
 
 import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.DBUtils;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import org.apache.kylin.source.adhocquery.IPushDownRunner;
 
@@ -62,18 +63,15 @@ public class PushDownRunnerJdbcImpl implements 
IPushDownRunner {
         Connection connection = this.getConnection();
         ResultSet resultSet = null;
 
+        //extract column metadata
+        ResultSetMetaData metaData = null;
+        int columnCount = 0;
+
         try {
             statement = connection.createStatement();
             resultSet = statement.executeQuery(query);
             extractResults(resultSet, results);
-        } catch (SQLException sqlException) {
-            throw sqlException;
-        }
 
-        //extract column metadata
-        ResultSetMetaData metaData = null;
-        int columnCount = 0;
-        try {
             metaData = resultSet.getMetaData();
             columnCount = metaData.getColumnCount();
 
@@ -85,12 +83,32 @@ public class PushDownRunnerJdbcImpl implements 
IPushDownRunner {
                         metaData.getPrecision(i), metaData.getScale(i), 
metaData.getColumnType(i),
                         metaData.getColumnTypeName(i), metaData.isReadOnly(i), 
false, false));
             }
-
         } catch (SQLException sqlException) {
             throw sqlException;
+        } finally {
+            DBUtils.closeQuietly(resultSet);
+            DBUtils.closeQuietly(statement);
+            closeConnection(connection);
         }
+    }
+
+    @Override
+    public boolean executeUpdate(String sql) throws Exception {
+        Statement statement = null;
+        Connection connection = this.getConnection();
 
-        closeConnection(connection);
+        boolean success;
+        try {
+            statement = connection.createStatement();
+            statement.execute(sql);
+            success = true;
+        } catch (SQLException sqlException) {
+            throw sqlException;
+        } finally {
+            DBUtils.closeQuietly(statement);
+            closeConnection(connection);
+        }
+        return success;
     }
 
     private Connection getConnection() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java 
b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index ff88738..82321a4 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -38,6 +38,7 @@ import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.sql.SqlSelect;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.util.SqlVisitor;
+import org.apache.calcite.sql.validate.SqlValidatorException;
 import org.apache.commons.lang.text.StrBuilder;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -61,10 +62,14 @@ public class PushDownUtil {
         if (!kylinConfig.isPushDownEnabled()) {
             return false;
         }
-
-        Throwable rootCause = ExceptionUtils.getRootCause(sqlException);
-        boolean isExpectedCause = rootCause != null && 
(rootCause.getClass().equals(NoRealizationFoundException.class));
-
+        boolean isSelect = QueryUtil.isSelectStatement(sql);
+        boolean isExpectedCause = true;
+        
+        if (sqlException != null) {
+            Throwable rootCause = ExceptionUtils.getRootCause(sqlException);
+            isExpectedCause = rootCause != null && 
((rootCause.getClass().equals(NoRealizationFoundException.class)) || 
(rootCause.getClass().equals(SqlValidatorException.class)));
+        }
+        
         if (isExpectedCause) {
 
             logger.info("Query failed to utilize pre-calculation, routing to 
other engines", sqlException);
@@ -72,15 +77,16 @@ public class PushDownUtil {
             runner.init(kylinConfig);
             logger.debug("Query Pushdown runner {}", runner);
 
-            //            String expandCC = restoreComputedColumnToExpr(sql, 
project);
-            //            if (!StringUtils.equals(expandCC, sql)) {
-            //                logger.info("computed column in sql is expanded 
to:  " + expandCC);
-            //            }
-
             // default schema in calcite does not apply to other engines.
             // since this is a universql requirement, it's not implemented as 
a converter
             if (defaultSchema != null && !defaultSchema.equals("DEFAULT")) {
-                String completed = schemaCompletion(sql, defaultSchema);
+                String completed = sql;
+                try {
+                    completed = schemaCompletion(sql, defaultSchema);
+                } catch (SqlParseException e) {
+                    // fail to parse the pushdown sql, ignore
+                    logger.debug("fail to do schema completion on the pushdown 
sql, ignore it.", e.getMessage());
+                }
                 if (!sql.equals(completed)) {
                     logger.info("the query is converted to {} after schema 
completion", completed);
                     sql = completed;
@@ -96,7 +102,11 @@ public class PushDownUtil {
                 }
             }
 
-            runner.executeQuery(sql, results, columnMetas);
+            if (isSelect == true) {
+                runner.executeQuery(sql, results, columnMetas);
+            } else {
+                runner.executeUpdate(sql);
+            }
             return true;
         } else {
             return false;

http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java 
b/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
index 3796d44..2dff07c 100644
--- a/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
@@ -197,4 +197,32 @@ public class QueryUtil {
         }
     }
 
+    public static boolean isSelectStatement(String sql) {
+        String sql1 = removeCommentInSql(sql);
+        return sql1.startsWith("select") || sql1.startsWith("with") && 
sql1.contains("select");
+    }
+
+    public static String removeCommentInSql(String sql) {
+        String sql1 = sql.toLowerCase();
+        // match two patterns, one is "-- comment", the other is "/* comment 
*/"
+        final String[] commentPatterns = new String[] {"--[^\r\n]*", 
"/\\*[^\\*/]*"};
+        final int[] endOffset = new int[] {0, 2};
+
+        for (int i = 0; i < commentPatterns.length; i++) {
+            String commentPattern = commentPatterns[i];
+            Pattern pattern = Pattern.compile(commentPattern);
+            Matcher matcher = pattern.matcher(sql1);
+
+            while (matcher.find()) {
+                if (matcher.start() == 0) {
+                    sql1 = sql1.substring(matcher.end() + endOffset[i]).trim();
+                } else if ((matcher.start() > 0 && sql1.charAt(matcher.start() 
- 1) != '\'')) {
+                    sql1 = (sql1.substring(0, matcher.start()) + 
sql1.substring(matcher.end() + endOffset[i])).trim();
+                }
+                matcher = pattern.matcher(sql1);
+            }
+        }
+
+        return sql1;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java 
b/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
index 942ef0b..46f5df4 100644
--- a/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
+++ b/query/src/test/java/org/apache/kylin/query/util/QueryUtilTest.java
@@ -60,4 +60,43 @@ public class QueryUtilTest extends LocalFileMetadataTestCase 
{
             Assert.assertEquals("select * from \"DEFAULT\".TEST_KYLIN_FACT", 
s);
         }
     }
+
+    @Test
+    public void testRemoveCommentInSql() {
+
+        String originSql =  "select count(*) from test_kylin_fact where price 
> 10.0";
+
+        {
+            String sqlWithComment = "-- comment \n" + originSql;
+
+            Assert.assertEquals(originSql, 
QueryUtil.removeCommentInSql(sqlWithComment));
+        }
+
+        {
+            String sqlWithComment = "-- comment \n -- comment\n" + originSql;
+            Assert.assertEquals(originSql, 
QueryUtil.removeCommentInSql(sqlWithComment));
+        }
+
+        {
+
+            String sqlWithComment = "-- \n -- comment \n" + originSql;
+            Assert.assertEquals(originSql, 
QueryUtil.removeCommentInSql(sqlWithComment));
+        }
+
+        {
+            String sqlWithComment = originSql + "-- \n -- comment \n";
+            Assert.assertEquals(originSql, 
QueryUtil.removeCommentInSql(sqlWithComment));
+        }
+
+        {
+            String sqlWithComment = "-- \n -- comment \n" + originSql + "-- \n 
-- comment \n";
+            Assert.assertEquals(originSql, 
QueryUtil.removeCommentInSql(sqlWithComment));
+        }
+
+        {
+            String sqlWithComment = "/* comment */ " + originSql + "-- \n -- 
comment \n";
+            Assert.assertEquals(originSql, 
QueryUtil.removeCommentInSql(sqlWithComment));
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/dbecb9b8/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 533b93d..c227d71 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -182,6 +182,29 @@ public class QueryService extends BasicService {
         }
     }
 
+
+    public SQLResponse update(SQLRequest sqlRequest) throws Exception {
+        // non select operations, only supported when enable pushdown
+        logger.debug("Query pushdown enabled, redirect the query to 
alternative engine. ");
+        Connection conn = null;
+        List<List<String>> results = Lists.newArrayList();
+        boolean isPushDown;
+        try {
+            conn = QueryConnection.getConnection(sqlRequest.getProject());
+            isPushDown = PushDownUtil.doPushDownQuery(sqlRequest.getProject(), 
sqlRequest.getSql(), conn.getSchema(), null, null, null);
+        } catch (Exception e) {
+            logger.error("failed to do pushdown, error is " + e.getMessage(), 
e);
+            throw new InternalErrorException(e);
+        } finally {
+            close(null, null, conn);
+        }
+        List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
+        columnMetas.add(new SelectedColumnMeta(false, false, false, false, 1, 
false, Integer.MAX_VALUE, "c0", "c0",
+                null, null, null, Integer.MAX_VALUE, 128, 1, "char", false, 
false, false));
+        SQLResponse sqlResponse = getSqlResponse(isPushDown, results, 
columnMetas);
+        return sqlResponse;
+    }
+
     public void saveQuery(final String creator, final Query query) throws 
IOException {
         List<Query> queries = getQueries(creator);
         queries.add(query);
@@ -367,11 +390,7 @@ public class QueryService extends BasicService {
             logger.info("Using project: " + project);
             logger.info("The original query:  " + sql);
 
-            if (!sql.toLowerCase().contains("select")
-                    && KylinConfig.getInstanceFromEnv().isPushDownEnabled() == 
false) {
-                logger.debug("Directly return exception as not supported");
-                throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
-            }
+            final boolean isSelect = QueryUtil.isSelectStatement(sql);
 
             long startTime = System.currentTimeMillis();
 
@@ -386,7 +405,15 @@ public class QueryService extends BasicService {
 
             try {
                 if (null == sqlResponse) {
-                    sqlResponse = query(sqlRequest);
+                    if (isSelect == true) {
+                        sqlResponse = query(sqlRequest);
+                    } else if (kylinConfig.isPushDownEnabled() == true) {
+                        sqlResponse = update(sqlRequest);
+                    } else {
+                        logger.debug(
+                                "Directly return exception as the sql is 
unsupported, and query pushdown is disabled");
+                        throw new 
BadRequestException(msg.getNOT_SUPPORTED_SQL());
+                    }
 
                     long durationThreshold = 
kylinConfig.getQueryDurationCacheThreshold();
                     long scanCountThreshold = 
kylinConfig.getQueryScanCountCacheThreshold();
@@ -397,6 +424,9 @@ public class QueryService extends BasicService {
                             String.valueOf(sqlResponse.getTotalScanCount()));
                     if (checkCondition(queryCacheEnabled, "query cache is 
disabled") //
                             && checkCondition(!sqlResponse.getIsException(), 
"query has exception") //
+                            && checkCondition(!(sqlResponse.isPushDown()
+                                    && (isSelect == false || 
kylinConfig.isPushdownQueryCacheEnabled() == false)),
+                                    "query is executed with pushdown, but it 
is non-select, or the cache for pushdown is disabled") //
                             && checkCondition(
                                     sqlResponse.getDuration() > 
durationThreshold
                                             || sqlResponse.getTotalScanCount() 
> scanCountThreshold

Reply via email to