Repository: flink
Updated Branches:
  refs/heads/master 9f928d1ec -> 7ce42c2e7


[FLINK-4281] [table] Wrap all Calcite Exceptions in Flink Exceptions

This closes #2372.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ce42c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ce42c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ce42c2e

Branch: refs/heads/master
Commit: 7ce42c2e7e332b0fafd19a3f9ed49e4554958fdd
Parents: 9f928d1
Author: Jark Wu <wuchong...@alibaba-inc.com>
Authored: Mon Aug 15 23:25:08 2016 +0800
Committer: twalthr <twal...@apache.org>
Committed: Wed Aug 17 16:46:23 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/FlinkPlannerImpl.scala      | 56 +++++++++++---------
 .../org/apache/flink/api/table/exceptions.scala | 12 +++++
 2 files changed, 43 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ce42c2e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
index f677784..bb08654 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
@@ -29,7 +29,7 @@ import org.apache.calcite.rel.RelRoot
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.RexBuilder
 import org.apache.calcite.schema.SchemaPlus
-import org.apache.calcite.sql.parser.{SqlParseException, SqlParser}
+import org.apache.calcite.sql.parser.{SqlParseException => CSqlParseException, 
SqlParser}
 import org.apache.calcite.sql.validate.SqlValidator
 import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, 
SqlToRelConverter}
@@ -55,9 +55,9 @@ class FlinkPlannerImpl(
   val convertletTable: SqlRexConvertletTable = config.getConvertletTable
   val defaultSchema: SchemaPlus = config.getDefaultSchema
 
-  var validator: FlinkCalciteSqlValidator = null
-  var validatedSqlNode: SqlNode = null
-  var root: RelRoot = null
+  var validator: FlinkCalciteSqlValidator = _
+  var validatedSqlNode: SqlNode = _
+  var root: RelRoot = _
 
   private def ready() {
     if (this.traitDefs != null) {
@@ -68,15 +68,18 @@ class FlinkPlannerImpl(
     }
   }
 
-  @throws(classOf[SqlParseException])
   def parse(sql: String): SqlNode = {
-    ready()
-    val parser: SqlParser = SqlParser.create(sql, parserConfig)
-    val sqlNode: SqlNode = parser.parseStmt
-    sqlNode
+    try {
+      ready()
+      val parser: SqlParser = SqlParser.create(sql, parserConfig)
+      val sqlNode: SqlNode = parser.parseStmt
+      sqlNode
+    } catch {
+      case e: CSqlParseException =>
+        throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
+    }
   }
 
-  @throws(classOf[ValidationException])
   def validate(sqlNode: SqlNode): SqlNode = {
     validator = new FlinkCalciteSqlValidator(operatorTable, 
createCatalogReader, typeFactory)
     validator.setIdentifierExpansion(true)
@@ -85,24 +88,27 @@ class FlinkPlannerImpl(
     }
     catch {
       case e: RuntimeException =>
-        throw new ValidationException(s"SQL validation failed.", e)
+        throw new ValidationException(s"SQL validation failed. 
${e.getMessage}", e)
     }
     validatedSqlNode
   }
 
-  @throws(classOf[RelConversionException])
   def rel(sql: SqlNode): RelRoot = {
-    assert(validatedSqlNode != null)
-    val rexBuilder: RexBuilder = createRexBuilder
-    val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
-    val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
-      new ViewExpanderImpl, validator, createCatalogReader, cluster, 
convertletTable)
-    sqlToRelConverter.setTrimUnusedFields(false)
-    sqlToRelConverter.enableTableAccessConversion(false)
-    root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
-    root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
-    root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
-    root
+    try {
+      assert(validatedSqlNode != null)
+      val rexBuilder: RexBuilder = createRexBuilder
+      val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder)
+      val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
+        new ViewExpanderImpl, validator, createCatalogReader, cluster, 
convertletTable)
+      sqlToRelConverter.setTrimUnusedFields(false)
+      sqlToRelConverter.enableTableAccessConversion(false)
+      root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
+      root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+      root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+      root
+    } catch {
+      case e: RelConversionException => throw TableException(e.getMessage)
+    }
   }
 
   /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]]
@@ -120,8 +126,8 @@ class FlinkPlannerImpl(
         sqlNode = parser.parseQuery
       }
       catch {
-        case e: SqlParseException =>
-          throw new RuntimeException("parse failed", e)
+        case e: CSqlParseException =>
+          throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
       }
       val catalogReader: CalciteCatalogReader = 
createCatalogReader.withSchemaPath(schemaPath)
       val validator: SqlValidator =

http://git-wip-us.apache.org/repos/asf/flink/blob/7ce42c2e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
index cd5ce6a..773e256 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
@@ -24,6 +24,18 @@ package org.apache.flink.api.table
 case class ExpressionParserException(msg: String) extends RuntimeException(msg)
 
 /**
+  * Exception for all errors occurring during sql parsing.
+  */
+case class SqlParserException(
+    msg: String,
+    cause: Throwable)
+  extends RuntimeException(msg, cause) {
+
+  def this(msg: String) = this(msg, null)
+
+}
+
+/**
   * General Exception for all errors during table handling.
   */
 case class TableException(msg: String) extends RuntimeException(msg)

Reply via email to