Repository: flink Updated Branches: refs/heads/master 9f928d1ec -> 7ce42c2e7
[FLINK-4281] [table] Wrap all Calcite Exceptions in Flink Exceptions This closes #2372. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ce42c2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ce42c2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ce42c2e Branch: refs/heads/master Commit: 7ce42c2e7e332b0fafd19a3f9ed49e4554958fdd Parents: 9f928d1 Author: Jark Wu <wuchong...@alibaba-inc.com> Authored: Mon Aug 15 23:25:08 2016 +0800 Committer: twalthr <twal...@apache.org> Committed: Wed Aug 17 16:46:23 2016 +0200 ---------------------------------------------------------------------- .../flink/api/table/FlinkPlannerImpl.scala | 56 +++++++++++--------- .../org/apache/flink/api/table/exceptions.scala | 12 +++++ 2 files changed, 43 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ce42c2e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala index f677784..bb08654 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala @@ -29,7 +29,7 @@ import org.apache.calcite.rel.RelRoot import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.RexBuilder import org.apache.calcite.schema.SchemaPlus -import org.apache.calcite.sql.parser.{SqlParseException, SqlParser} +import org.apache.calcite.sql.parser.{SqlParseException => CSqlParseException, SqlParser} import org.apache.calcite.sql.validate.SqlValidator import org.apache.calcite.sql.{SqlNode, SqlOperatorTable} import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter} @@ -55,9 +55,9 @@ class FlinkPlannerImpl( val convertletTable: SqlRexConvertletTable = config.getConvertletTable val defaultSchema: SchemaPlus = config.getDefaultSchema - var validator: FlinkCalciteSqlValidator = null - var validatedSqlNode: SqlNode = null - var root: RelRoot = null + var validator: FlinkCalciteSqlValidator = _ + var validatedSqlNode: SqlNode = _ + var root: RelRoot = _ private def ready() { if (this.traitDefs != null) { @@ -68,15 +68,18 @@ class FlinkPlannerImpl( } } - @throws(classOf[SqlParseException]) def parse(sql: String): SqlNode = { - ready() - val parser: SqlParser = SqlParser.create(sql, parserConfig) - val sqlNode: SqlNode = parser.parseStmt - sqlNode + try { + ready() + val parser: SqlParser = SqlParser.create(sql, parserConfig) + val sqlNode: SqlNode = parser.parseStmt + sqlNode + } catch { + case e: CSqlParseException => + throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e) + } } - @throws(classOf[ValidationException]) def validate(sqlNode: SqlNode): SqlNode = { validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory) validator.setIdentifierExpansion(true) @@ -85,24 +88,27 @@ class FlinkPlannerImpl( } catch { case e: RuntimeException => - throw new ValidationException(s"SQL validation failed.", e) + throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e) } validatedSqlNode } - @throws(classOf[RelConversionException]) def rel(sql: SqlNode): RelRoot = { - assert(validatedSqlNode != null) - val rexBuilder: RexBuilder = createRexBuilder - val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) - val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( - new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable) - sqlToRelConverter.setTrimUnusedFields(false) - sqlToRelConverter.enableTableAccessConversion(false) - root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true) - root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) - root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) - root + try { + assert(validatedSqlNode != null) + val rexBuilder: RexBuilder = createRexBuilder + val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( + new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable) + sqlToRelConverter.setTrimUnusedFields(false) + sqlToRelConverter.enableTableAccessConversion(false) + root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true) + root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) + root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) + root + } catch { + case e: RelConversionException => throw TableException(e.getMessage) + } } /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]] @@ -120,8 +126,8 @@ class FlinkPlannerImpl( sqlNode = parser.parseQuery } catch { - case e: SqlParseException => - throw new RuntimeException("parse failed", e) + case e: CSqlParseException => + throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e) } val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath) val validator: SqlValidator = http://git-wip-us.apache.org/repos/asf/flink/blob/7ce42c2e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala index cd5ce6a..773e256 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala @@ -24,6 +24,18 @@ package org.apache.flink.api.table case class ExpressionParserException(msg: String) extends RuntimeException(msg) /** + * Exception for all errors occurring during sql parsing. + */ +case class SqlParserException( + msg: String, + cause: Throwable) + extends RuntimeException(msg, cause) { + + def this(msg: String) = this(msg, null) + +} + +/** * General Exception for all errors during table handling. */ case class TableException(msg: String) extends RuntimeException(msg)