minor, refactor pushdown code
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/068baf6b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/068baf6b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/068baf6b Branch: refs/heads/master Commit: 068baf6b3ec13fe6e68e9a41593dcd139b4a8597 Parents: a4aeb77 Author: Li Yang <liy...@apache.org> Authored: Wed Aug 30 21:18:19 2017 +0800 Committer: Roger Shi <rogershijich...@gmail.com> Committed: Thu Aug 31 10:37:32 2017 +0800 ---------------------------------------------------------------------- .../kylin/common/debug/BackdoorToggles.java | 19 +++- .../source/adhocquery/IPushDownRunner.java | 2 +- .../org/apache/kylin/query/KylinTestBase.java | 11 +- .../query/adhoc/PushDownRunnerJdbcImpl.java | 5 +- .../apache/kylin/query/relnode/OLAPContext.java | 11 +- .../apache/kylin/query/relnode/OLAPJoinRel.java | 1 + .../relnode/OLAPToEnumerableConverter.java | 61 ++--------- .../routing/RoutingIndicatorException.java | 35 +++++++ .../apache/kylin/query/util/PushDownUtil.java | 104 ++++++++++--------- .../apache/kylin/rest/service/QueryService.java | 37 ++++--- 10 files changed, 145 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java index 8cb48b6..88c7c16 100644 --- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java +++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java @@ -40,6 +40,14 @@ public class BackdoorToggles { _backdoorToggles.set(toggles); } + public static void addToggle(String key, String value) { + Map<String, String> map = _backdoorToggles.get(); + if (map == null) { + setToggles(Maps.<String, String> newHashMap()); + } + _backdoorToggles.get().put(key, value); + } + public static void addToggles(Map<String, String> toggles) { Map<String, String> map = _backdoorToggles.get(); if (map == null) { @@ -47,6 +55,15 @@ public class BackdoorToggles { } _backdoorToggles.get().putAll(toggles); } + + // try avoid using this generic method + public static String getToggle(String key) { + Map<String, String> map = _backdoorToggles.get(); + if (map == null) + return null; + + return map.get(key); + } public static String getCoprocessorBehavior() { return getString(DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR); @@ -109,7 +126,7 @@ public class BackdoorToggles { public static boolean getPrepareOnly() { return getBoolean(DEBUG_TOGGLE_PREPARE_ONLY); } - + private static String getString(String key) { Map<String, String> toggles = _backdoorToggles.get(); if (toggles == null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java index 0336bfb..9983f5c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/adhocquery/IPushDownRunner.java @@ -47,6 +47,6 @@ public interface IPushDownRunner { * * @throws Exception if running pushdown fails */ - boolean executeUpdate(String sql) throws Exception; + void executeUpdate(String sql) throws Exception; } http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index 8f143c7..d3db995 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -70,7 +70,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.io.Files; /** @@ -261,14 +260,12 @@ public class KylinTestBase { return output(resultSet, needDisplay); } catch (SQLException sqlException) { - List<List<String>> results = Lists.newArrayList(); - List<SelectedColumnMeta> columnMetas = Lists.newArrayList(); - boolean b = PushDownUtil.doPushDownQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", results, - columnMetas, sqlException); - if (!b) { + Pair<List<List<String>>, List<SelectedColumnMeta>> result = PushDownUtil + .tryPushDownQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", sqlException); + if (result == null) { throw sqlException; } - return results.size(); + return result.getFirst().size(); } finally { if (resultSet != null) { try { http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java index 713629a..8c701c1 100644 --- a/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java +++ b/query/src/main/java/org/apache/kylin/query/adhoc/PushDownRunnerJdbcImpl.java @@ -93,22 +93,19 @@ public class PushDownRunnerJdbcImpl implements IPushDownRunner { } @Override - public boolean executeUpdate(String sql) throws Exception { + public void executeUpdate(String sql) throws Exception { Statement statement = null; Connection connection = this.getConnection(); - boolean success; try { statement = connection.createStatement(); statement.execute(sql); - success = true; } catch (SQLException sqlException) { throw sqlException; } finally { DBUtils.closeQuietly(statement); closeConnection(connection); } - return success; } private Connection getConnection() { http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java index 3a42ddb..d1608e9 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java @@ -29,6 +29,7 @@ import java.util.Set; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.FunctionDesc; @@ -111,6 +112,7 @@ public class OLAPContext { public OLAPSchema olapSchema = null; public OLAPTableScan firstTableScan = null; // to be fact table scan except "select * from lookupTable" public Set<OLAPTableScan> allTableScans = new HashSet<>(); + public Set<OLAPJoinRel> allOlapJoins = new HashSet<>(); public Set<MeasureDesc> involvedMeasure = new HashSet<>(); public TupleInfo returnTupleInfo = null; public boolean afterAggregate = false; @@ -203,14 +205,7 @@ public class OLAPContext { // ============================================================================ public interface IAccessController { - /* - * @return {TupleFilter} if the filter condition exists - * @OLAPAuthentication the authentication info - * @columns required columns from logic query plan - * @realization the cube used in this query - * @OLAPInsufficientException no rights exception - */ - public TupleFilter check(OLAPAuthentication olapAuthentication, Collection<TblColRef> columns, IRealization realization) throws IllegalArgumentException; + public void check(List<OLAPContext> contexts, KylinConfig config) throws IllegalStateException; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java index a27cf76..3b5c3cf 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPJoinRel.java @@ -124,6 +124,7 @@ public class OLAPJoinRel extends EnumerableJoin implements OLAPRel { } this.context = implementor.getContext(); + this.context.allOlapJoins.add(this); this.isTopJoin = !this.context.hasJoin; this.context.hasJoin = true; http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java index 7ac86b2..c7b0fe2 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java @@ -19,36 +19,24 @@ package org.apache.kylin.query.relnode; import java.util.List; -import java.util.Set; import org.apache.calcite.adapter.enumerable.EnumerableRel; import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; -import org.apache.calcite.adapter.enumerable.PhysType; -import org.apache.calcite.adapter.enumerable.PhysTypeImpl; -import org.apache.calcite.linq4j.tree.Blocks; -import org.apache.calcite.linq4j.tree.Expressions; import org.apache.calcite.plan.ConventionTraitDef; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.convert.ConverterImpl; import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlExplainLevel; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.metadata.filter.ColumnTupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.query.routing.RealizationChooser; -import org.apache.kylin.query.schema.OLAPTable; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; /** */ @@ -81,14 +69,11 @@ public class OLAPToEnumerableConverter extends ConverterImpl implements Enumerab OLAPRel.OLAPImplementor olapImplementor = new OLAPRel.OLAPImplementor(); olapImplementor.visitChild(getInput(), this); - // identify model + // identify model & realization List<OLAPContext> contexts = listContextsHavingScan(); RealizationChooser.selectRealization(contexts); - // identify realization for each context - for (OLAPContext context : contexts) { - doAccessControl(context); - } + doAccessControl(contexts); // rewrite query if necessary OLAPRel.RewriteImplementor rewriteImplementor = new OLAPRel.RewriteImplementor(); @@ -122,47 +107,13 @@ public class OLAPToEnumerableConverter extends ConverterImpl implements Enumerab return result; } - private void doAccessControl(OLAPContext context) { - String controllerCls = KylinConfig.getInstanceFromEnv().getQueryAccessController(); + private void doAccessControl(List<OLAPContext> contexts) { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + String controllerCls = config.getQueryAccessController(); if (null != controllerCls && !controllerCls.isEmpty()) { OLAPContext.IAccessController accessController = (OLAPContext.IAccessController) ClassUtil.newInstance(controllerCls); - TupleFilter tupleFilter = accessController.check(context.olapAuthen, context.allColumns, context.realization); - if (null != tupleFilter) { - context.filterColumns.addAll(collectColumns(tupleFilter)); - context.allColumns.addAll(collectColumns(tupleFilter)); - context.filter = TupleFilter.and(context.filter, tupleFilter); - } - } - } - - private Set<TblColRef> collectColumns(TupleFilter filter) { - Set<TblColRef> ret = Sets.newHashSet(); - collectColumnsRecursively(filter, ret); - return ret; - } - - private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) { - if (filter == null) - return; - - if (filter instanceof ColumnTupleFilter) { - collector.add(((ColumnTupleFilter) filter).getColumn()); + accessController.check(contexts, config); } - for (TupleFilter child : filter.getChildren()) { - collectColumnsRecursively(child, collector); - } - } - - @SuppressWarnings("unused") - private Result buildHiveResult(EnumerableRelImplementor enumImplementor, Prefer pref, OLAPContext context) { - RelDataType hiveRowType = getRowType(); - - context.setReturnTupleInfo(hiveRowType, null); - PhysType physType = PhysTypeImpl.of(enumImplementor.getTypeFactory(), hiveRowType, pref.preferArray()); - - RelOptTable factTable = context.firstTableScan.getTable(); - Result result = enumImplementor.result(physType, Blocks.toBlock(Expressions.call(factTable.getExpression(OLAPTable.class), "executeHiveQuery", enumImplementor.getRootExpression()))); - return result; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/routing/RoutingIndicatorException.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingIndicatorException.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingIndicatorException.java new file mode 100644 index 0000000..a655455 --- /dev/null +++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingIndicatorException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.query.routing; + +/** + * A special exception serves as a routing indicator. + */ +public class RoutingIndicatorException extends RuntimeException { + + private static final long serialVersionUID = 7631508437415520091L; + + public RoutingIndicatorException(String message, Throwable t) { + super(message, t); + } + + public RoutingIndicatorException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java index 82321a4..9262b20 100644 --- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java +++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java @@ -47,70 +47,78 @@ import org.apache.kylin.common.util.Pair; import org.apache.kylin.metadata.model.tool.CalciteParser; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.query.routing.NoRealizationFoundException; +import org.apache.kylin.query.routing.RoutingIndicatorException; import org.apache.kylin.source.adhocquery.IPushDownConverter; import org.apache.kylin.source.adhocquery.IPushDownRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + public class PushDownUtil { private static final Logger logger = LoggerFactory.getLogger(PushDownUtil.class); - public static boolean doPushDownQuery(String project, String sql, String defaultSchema, List<List<String>> results, - List<SelectedColumnMeta> columnMetas, SQLException sqlException) throws Exception { + public static Pair<List<List<String>>, List<SelectedColumnMeta>> tryPushDownQuery(String project, String sql, + String defaultSchema, SQLException sqlException) throws Exception { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - if (!kylinConfig.isPushDownEnabled()) { - return false; - } - boolean isSelect = QueryUtil.isSelectStatement(sql); - boolean isExpectedCause = true; - - if (sqlException != null) { - Throwable rootCause = ExceptionUtils.getRootCause(sqlException); - isExpectedCause = rootCause != null && ((rootCause.getClass().equals(NoRealizationFoundException.class)) || (rootCause.getClass().equals(SqlValidatorException.class))); - } - - if (isExpectedCause) { - - logger.info("Query failed to utilize pre-calculation, routing to other engines", sqlException); - IPushDownRunner runner = (IPushDownRunner) ClassUtil.newInstance(kylinConfig.getPushDownRunnerClassName()); - runner.init(kylinConfig); - logger.debug("Query Pushdown runner {}", runner); - - // default schema in calcite does not apply to other engines. - // since this is a universql requirement, it's not implemented as a converter - if (defaultSchema != null && !defaultSchema.equals("DEFAULT")) { - String completed = sql; - try { - completed = schemaCompletion(sql, defaultSchema); - } catch (SqlParseException e) { - // fail to parse the pushdown sql, ignore - logger.debug("fail to do schema completion on the pushdown sql, ignore it.", e.getMessage()); - } - if (!sql.equals(completed)) { - logger.info("the query is converted to {} after schema completion", completed); - sql = completed; - } - } - for (String converterName : kylinConfig.getPushDownConverterClassNames()) { - IPushDownConverter converter = (IPushDownConverter) ClassUtil.newInstance(converterName); - String converted = converter.convert(sql, project, defaultSchema); - if (!sql.equals(converted)) { - logger.info("the query is converted to {} after applying converter {}", converted, converterName); - sql = converted; - } + if (!kylinConfig.isPushDownEnabled()) + return null; + + if (!isExpectedCause(sqlException)) + return null; + + logger.info("Query failed to utilize pre-calculation, routing to other engines", sqlException); + IPushDownRunner runner = (IPushDownRunner) ClassUtil.newInstance(kylinConfig.getPushDownRunnerClassName()); + runner.init(kylinConfig); + logger.debug("Query Pushdown runner {}", runner); + + // default schema in calcite does not apply to other engines. + // since this is a universql requirement, it's not implemented as a converter + if (defaultSchema != null && !defaultSchema.equals("DEFAULT")) { + String completed = sql; + try { + completed = schemaCompletion(sql, defaultSchema); + } catch (SqlParseException e) { + // fail to parse the pushdown sql, ignore + logger.debug("fail to do schema completion on the pushdown sql, ignore it.", e.getMessage()); } + if (!sql.equals(completed)) { + logger.info("the query is converted to {} after schema completion", completed); + sql = completed; + } + } - if (isSelect == true) { - runner.executeQuery(sql, results, columnMetas); - } else { - runner.executeUpdate(sql); + for (String converterName : kylinConfig.getPushDownConverterClassNames()) { + IPushDownConverter converter = (IPushDownConverter) ClassUtil.newInstance(converterName); + String converted = converter.convert(sql, project, defaultSchema); + if (!sql.equals(converted)) { + logger.info("the query is converted to {} after applying converter {}", converted, converterName); + sql = converted; } - return true; + } + + List<List<String>> returnRows = Lists.newArrayList(); + List<SelectedColumnMeta> returnColumnMeta = Lists.newArrayList(); + + if (QueryUtil.isSelectStatement(sql)) { + runner.executeQuery(sql, returnRows, returnColumnMeta); } else { - return false; + runner.executeUpdate(sql); } + return Pair.newPair(returnRows, returnColumnMeta); + } + + private static boolean isExpectedCause(SQLException sqlException) { + Preconditions.checkArgument(sqlException != null); + + Throwable rootCause = ExceptionUtils.getRootCause(sqlException); + return rootCause != null && // + (rootCause instanceof NoRealizationFoundException // + || rootCause instanceof SqlValidatorException // + || rootCause instanceof RoutingIndicatorException); } static String schemaCompletion(String inputSql, String schema) throws SqlParseException { http://git-wip-us.apache.org/repos/asf/kylin/blob/068baf6b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index c227d71..5ac595f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -63,6 +63,7 @@ import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.DBUtils; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -187,22 +188,18 @@ public class QueryService extends BasicService { // non select operations, only supported when enable pushdown logger.debug("Query pushdown enabled, redirect the query to alternative engine. "); Connection conn = null; - List<List<String>> results = Lists.newArrayList(); - boolean isPushDown; try { conn = QueryConnection.getConnection(sqlRequest.getProject()); - isPushDown = PushDownUtil.doPushDownQuery(sqlRequest.getProject(), sqlRequest.getSql(), conn.getSchema(), null, null, null); + Pair<List<List<String>>, List<SelectedColumnMeta>> r = PushDownUtil + .tryPushDownQuery(sqlRequest.getProject(), sqlRequest.getSql(), conn.getSchema(), null); + return buildSqlResponse(true, r.getFirst(), r.getSecond()); + } catch (Exception e) { logger.error("failed to do pushdown, error is " + e.getMessage(), e); throw new InternalErrorException(e); } finally { close(null, null, conn); } - List<SelectedColumnMeta> columnMetas = Lists.newArrayList(); - columnMetas.add(new SelectedColumnMeta(false, false, false, false, 1, false, Integer.MAX_VALUE, "c0", "c0", - null, null, null, Integer.MAX_VALUE, 128, 1, "char", false, false, false)); - SQLResponse sqlResponse = getSqlResponse(isPushDown, results, columnMetas); - return sqlResponse; } public void saveQuery(final String creator, final Query query) throws IOException { @@ -791,7 +788,7 @@ public class QueryService extends BasicService { private SQLResponse execute(String correctedSql, SQLRequest sqlRequest, Connection conn) throws Exception { Statement stat = null; ResultSet resultSet = null; - Boolean isPushDown = false; + boolean isPushDown = false; List<List<String>> results = Lists.newArrayList(); List<SelectedColumnMeta> columnMetas = Lists.newArrayList(); @@ -830,17 +827,22 @@ public class QueryService extends BasicService { results.add(oneRow); } + } catch (SQLException sqlException) { - isPushDown = PushDownUtil.doPushDownQuery(sqlRequest.getProject(), correctedSql, conn.getSchema(), results, - columnMetas, sqlException); - if (!isPushDown) { + Pair<List<List<String>>, List<SelectedColumnMeta>> r = PushDownUtil + .tryPushDownQuery(sqlRequest.getProject(), correctedSql, conn.getSchema(), sqlException); + if (r == null) throw sqlException; - } + + isPushDown = true; + results = r.getFirst(); + columnMetas = r.getSecond(); + } finally { - close(resultSet, stat, null);//conn is passed in, not my duty to close + close(resultSet, stat, null); //conn is passed in, not my duty to close } - return getSqlResponse(isPushDown, results, columnMetas); + return buildSqlResponse(isPushDown, results, columnMetas); } private SQLResponse getPrepareOnlySqlResponse(String correctedSql, Connection conn, Boolean isPushDown, @@ -883,10 +885,10 @@ public class QueryService extends BasicService { CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(false); } - return getSqlResponse(isPushDown, results, columnMetas); + return buildSqlResponse(isPushDown, results, columnMetas); } - private SQLResponse getSqlResponse(Boolean isPushDown, List<List<String>> results, + private SQLResponse buildSqlResponse(Boolean isPushDown, List<List<String>> results, List<SelectedColumnMeta> columnMetas) { boolean isPartialResult = false; @@ -918,6 +920,7 @@ public class QueryService extends BasicService { * @param param * @throws SQLException */ + @SuppressWarnings("unused") private void setParam(PreparedStatement preparedState, int index, PrepareSqlRequest.StateParam param) throws SQLException { boolean isNull = (null == param.getValue());