Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite 20127ab71 -> 663101325


[FLINK-3226] Translation of scalar function substring()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e220745
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e220745
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e220745

Branch: refs/heads/tableOnCalcite
Commit: 3e22074553cedde1c3a1754d91d26a9f35ac3c2f
Parents: 1c53c87
Author: twalthr <twal...@apache.org>
Authored: Sat Feb 20 21:41:44 2016 +0100
Committer: twalthr <twal...@apache.org>
Committed: Thu Feb 25 10:32:41 2016 +0100

----------------------------------------------------------------------
 .../flink/api/scala/table/expressionDsl.scala   |   8 +-
 .../flink/api/table/codegen/CodeGenerator.scala |  19 +-
 .../api/table/codegen/OperatorCodeGen.scala     | 462 ------------------
 .../api/table/codegen/calls/CallGenerator.scala |  30 ++
 .../codegen/calls/MethodCallGenerator.scala     |  62 +++
 .../table/codegen/calls/ScalarFunctions.scala   |  72 +++
 .../table/codegen/calls/ScalarOperators.scala   | 463 +++++++++++++++++++
 .../table/expressions/stringExpressions.scala   |  21 +-
 .../api/table/parser/ExpressionParser.scala     |   4 +-
 .../api/table/plan/RexNodeTranslator.scala      |  35 +-
 .../flink/api/java/table/test/AsITCase.java     |   4 +-
 .../api/java/table/test/CastingITCase.java      |   2 +-
 .../api/java/table/test/ExpressionsITCase.java  |   2 +-
 .../flink/api/java/table/test/FilterITCase.java |   2 +-
 .../flink/api/java/table/test/SelectITCase.java |   2 +-
 .../table/test/StringExpressionsITCase.java     |  10 +-
 .../flink/api/scala/table/test/AsITCase.scala   |   5 +-
 .../scala/table/test/ExpressionsITCase.scala    |   5 +-
 .../api/scala/table/test/FilterITCase.scala     |   5 +-
 .../api/scala/table/test/SelectITCase.scala     |   5 +-
 .../table/test/StringExpressionsITCase.scala    |  14 +-
 .../api/table/test/ScalarFunctionsTest.scala    |  96 ++++
 .../api/table/test/TableProgramsTestBase.scala  |  97 ----
 .../table/test/utils/ExpressionEvaluator.scala  |  93 ++++
 .../test/utils/TableProgramsTestBase.scala      |  97 ++++
 25 files changed, 1006 insertions(+), 609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 058ff0e..68914da 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -63,8 +63,12 @@ trait ImplicitExpressionOperations {
   def count = Count(expr)
   def avg = Avg(expr)
 
-  def substring(beginIndex: Expression, endIndex: Expression = 
Literal(Int.MaxValue)) = {
-    Substring(expr, beginIndex, endIndex)
+  def substring(beginIndex: Expression, endIndex: Expression) = {
+    Substring(expr, beginIndex, Some(endIndex))
+  }
+
+  def substring(beginIndex: Expression) = {
+    Substring(expr, beginIndex)
   }
 
   def cast(toType: TypeInformation[_]) = Cast(expr, toType)

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index add5627..a052618 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -19,9 +19,10 @@
 package org.apache.flink.api.table.codegen
 
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
-import org.apache.flink.api.common.functions.{FlatMapFunction, Function, 
MapFunction}
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction, Function, MapFunction}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
@@ -29,12 +30,13 @@ import 
org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.table.TableConfig
 import org.apache.flink.api.table.codegen.CodeGenUtils._
 import org.apache.flink.api.table.codegen.Indenter.toISC
-import org.apache.flink.api.table.codegen.OperatorCodeGen._
+import org.apache.flink.api.table.codegen.calls.ScalarFunctions
+import org.apache.flink.api.table.codegen.calls.ScalarOperators._
 import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo
 import org.apache.flink.api.table.typeinfo.RowTypeInfo
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable
-import org.apache.flink.api.common.functions.FlatJoinFunction
 
 /**
   * A code generator for generating Flink 
[[org.apache.flink.api.common.functions.Function]]s.
@@ -540,7 +542,7 @@ class CodeGenerator(
         }
       case INTEGER =>
         val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
-        if (decimal.isValidShort) {
+        if (decimal.isValidInt) {
           generateNonNullLiteral(resultType, decimal.intValue().toString)
         }
         else {
@@ -702,10 +704,19 @@ class CodeGenerator(
         requireBoolean(operand)
         generateNot(nullCheck, operand)
 
+      // casting
       case CAST =>
         val operand = operands.head
         generateCast(nullCheck, operand, resultType)
 
+      // advanced scalar functions
+      case call: SqlOperator =>
+        val callGen = ScalarFunctions.getCallGenerator(call, 
operands.map(_.resultType))
+        callGen
+          .getOrElse(throw new CodeGenException(s"Unsupported call: $call"))
+          .generate(this, operands)
+
+      // unknown or invalid
       case call@_ =>
         throw new CodeGenException(s"Unsupported call: $call")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
deleted file mode 100644
index a61bbe4..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
+++ /dev/null
@@ -1,462 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, BasicTypeInfo, 
TypeInformation}
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-
-object OperatorCodeGen {
-
-  def generateArithmeticOperator(
-      operator: String,
-      nullCheck: Boolean,
-      resultType: TypeInformation[_],
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    // String arithmetic // TODO rework
-    if (isString(left)) {
-      generateOperatorIfNotNull(nullCheck, resultType, left, right) {
-      (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
-      }
-    }
-    // Numeric arithmetic
-    else if (isNumeric(left) && isNumeric(right)) {
-      val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]]
-      val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]]
-      val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-
-      generateOperatorIfNotNull(nullCheck, resultType, left, right) {
-      (leftTerm, rightTerm) =>
-        // no casting required
-        if (leftType == resultType && rightType == resultType) {
-          s"$leftTerm $operator $rightTerm"
-        }
-        // left needs casting
-        else if (leftType != resultType && rightType == resultType) {
-          s"(($resultTypeTerm) $leftTerm) $operator $rightTerm"
-        }
-        // right needs casting
-        else if (leftType == resultType && rightType != resultType) {
-          s"$leftTerm $operator (($resultTypeTerm) $rightTerm)"
-        }
-        // both sides need casting
-        else {
-          s"(($resultTypeTerm) $leftTerm) $operator (($resultTypeTerm) 
$rightTerm)"
-        }
-      }
-    }
-    else {
-      throw new CodeGenException("Unsupported arithmetic operation.")
-    }
-  }
-
-  def generateUnaryArithmeticOperator(
-      operator: String,
-      nullCheck: Boolean,
-      resultType: TypeInformation[_],
-      operand: GeneratedExpression)
-    : GeneratedExpression = {
-    generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) {
-      (operandTerm) => s"$operator($operandTerm)"
-    }
-  }
-
-  def generateEquals(
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
-      if (isReference(left)) {
-        (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
-      }
-      else if (isReference(right)) {
-        (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)"
-      }
-      else {
-        (leftTerm, rightTerm) => s"$leftTerm == $rightTerm"
-      }
-    }
-  }
-
-  def generateNotEquals(
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
-      if (isReference(left)) {
-        (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
-      }
-      else if (isReference(right)) {
-        (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))"
-      }
-      else {
-        (leftTerm, rightTerm) => s"$leftTerm != $rightTerm"
-      }
-    }
-  }
-
-  def generateComparison(
-      operator: String,
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
-      if (isString(left) && isString(right)) {
-        (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0"
-      }
-      else if (isNumeric(left) && isNumeric(right)) {
-        (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
-      }
-      else {
-        throw new CodeGenException("Comparison is only supported for Strings 
and numeric types.")
-      }
-    }
-  }
-
-  def generateIsNull(
-      nullCheck: Boolean,
-      operand: GeneratedExpression)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val operatorCode = if (nullCheck) {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = ${operand.nullTerm};
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    }
-    else if (!nullCheck && isReference(operand.resultType)) {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = ${operand.resultTerm} == null;
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = false;
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
-  }
-
-  def generateIsNotNull(
-      nullCheck: Boolean,
-      operand: GeneratedExpression)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val operatorCode = if (nullCheck) {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = !${operand.nullTerm};
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    }
-    else if (!nullCheck && isReference(operand.resultType)) {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = ${operand.resultTerm} != null;
-        |boolean $nullTerm = false;
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${operand.code}
-        |boolean $resultTerm = true;
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
-  }
-
-  def generateAnd(
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-
-    val operatorCode = if (nullCheck) {
-      // Three-valued logic:
-      // no Unknown -> Two-valued logic
-      // True && Unknown -> Unknown
-      // False && Unknown -> False
-      // Unknown && True -> Unknown
-      // Unknown && False -> False
-      // Unknown && Unknown -> Unknown
-      s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm;
-        |boolean $nullTerm;
-        |if (!${left.nullTerm} && !${right.nullTerm}) {
-        |  $resultTerm = ${left.resultTerm} && ${right.resultTerm};
-        |  $nullTerm = false;
-        |}
-        |else if (!${left.nullTerm} && ${left.resultTerm} && 
${right.nullTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else if (!${left.nullTerm} && !${left.resultTerm} && 
${right.nullTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = false;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && 
${right.resultTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && 
!${right.resultTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = false;
-        |}
-        |else {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
-  }
-
-  def generateOr(
-      nullCheck: Boolean,
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-
-    val operatorCode = if (nullCheck) {
-      // Three-valued logic:
-      // no Unknown -> Two-valued logic
-      // True && Unknown -> True
-      // False && Unknown -> Unknown
-      // Unknown && True -> True
-      // Unknown && False -> Unknown
-      // Unknown && Unknown -> Unknown
-      s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm;
-        |boolean $nullTerm;
-        |if (!${left.nullTerm} && !${right.nullTerm}) {
-        |  $resultTerm = ${left.resultTerm} || ${right.resultTerm};
-        |  $nullTerm = false;
-        |}
-        |else if (!${left.nullTerm} && ${left.resultTerm} && 
${right.nullTerm}) {
-        |  $resultTerm = true;
-        |  $nullTerm = false;
-        |}
-        |else if (!${left.nullTerm} && !${left.resultTerm} && 
${right.nullTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && 
${right.resultTerm}) {
-        |  $resultTerm = true;
-        |  $nullTerm = false;
-        |}
-        |else if (${left.nullTerm} && !${right.nullTerm} && 
!${right.resultTerm}) {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |else {
-        |  $resultTerm = false;
-        |  $nullTerm = true;
-        |}
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${left.code}
-        |${right.code}
-        |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
-  }
-
-  def generateNot(
-      nullCheck: Boolean,
-      operand: GeneratedExpression)
-    : GeneratedExpression = {
-    // Three-valued logic:
-    // no Unknown -> Two-valued logic
-    // Unknown -> Unknown
-    generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) {
-      (operandTerm) => s"!($operandTerm)"
-    }
-  }
-
-  def generateCast(
-      nullCheck: Boolean,
-      operand: GeneratedExpression,
-      targetType: TypeInformation[_])
-    : GeneratedExpression = {
-    targetType match {
-      // identity casting
-      case operand.resultType =>
-        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-          (operandTerm) => s"$operandTerm"
-        }
-
-      // * -> String
-      case STRING_TYPE_INFO =>
-        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-          (operandTerm) => s""" "" + $operandTerm"""
-        }
-
-      // * -> Date
-      case DATE_TYPE_INFO =>
-        throw new CodeGenException("Date type not supported yet.")
-
-      // * -> Void
-      case VOID_TYPE_INFO =>
-        throw new CodeGenException("Void type not supported.")
-
-      // * -> Character
-      case CHAR_TYPE_INFO =>
-        throw new CodeGenException("Character type not supported.")
-
-      // NUMERIC TYPE -> Boolean
-      case BOOLEAN_TYPE_INFO if isNumeric(operand) =>
-        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-          (operandTerm) => s"$operandTerm != 0"
-        }
-
-      // String -> BASIC TYPE (not String, Date, Void, Character)
-      case ti: BasicTypeInfo[_] if isString(operand) =>
-        val wrapperClass = targetType.getTypeClass.getCanonicalName
-        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-          (operandTerm) => s"$wrapperClass.valueOf($operandTerm)"
-        }
-
-      // NUMERIC TYPE -> NUMERIC TYPE
-      case nti: NumericTypeInfo[_] if isNumeric(operand) =>
-        val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
-        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-          (operandTerm) => s"($targetTypeTerm) $operandTerm"
-        }
-
-      // Boolean -> NUMERIC TYPE
-      case nti: NumericTypeInfo[_] if isBoolean(operand) =>
-        val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
-        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-          (operandTerm) => s"($targetTypeTerm) ($operandTerm ? 1 : 0)"
-        }
-
-      case _ =>
-        throw new CodeGenException(s"Unsupported cast from 
'${operand.resultType}'" +
-          s"to '$targetType'.")
-    }
-  }
-
-  // 
----------------------------------------------------------------------------------------------
-
-  private def generateUnaryOperatorIfNotNull(
-      nullCheck: Boolean,
-      resultType: TypeInformation[_],
-      operand: GeneratedExpression)
-      (expr: (String) => String)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-    val defaultValue = primitiveDefaultValue(resultType)
-
-    val operatorCode = if (nullCheck) {
-      s"""
-        |${operand.code}
-        |$resultTypeTerm $resultTerm;
-        |boolean $nullTerm;
-        |if (!${operand.nullTerm}) {
-        |  $resultTerm = ${expr(operand.resultTerm)};
-        |  $nullTerm = false;
-        |}
-        |else {
-        |  $resultTerm = $defaultValue;
-        |  $nullTerm = true;
-        |}
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${operand.code}
-        |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)};
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
-  }
-
-  private def generateOperatorIfNotNull(
-      nullCheck: Boolean,
-      resultType: TypeInformation[_],
-      left: GeneratedExpression,
-      right: GeneratedExpression)
-      (expr: (String, String) => String)
-    : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val nullTerm = newName("isNull")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-    val defaultValue = primitiveDefaultValue(resultType)
-
-    val resultCode = if (nullCheck) {
-      s"""
-        |${left.code}
-        |${right.code}
-        |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm};
-        |$resultTypeTerm $resultTerm;
-        |if ($nullTerm) {
-        |  $resultTerm = $defaultValue;
-        |}
-        |else {
-        |  $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
-        |}
-        |""".stripMargin
-    }
-    else {
-      s"""
-        |${left.code}
-        |${right.code}
-        |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, 
right.resultTerm)};
-        |""".stripMargin
-    }
-
-    GeneratedExpression(resultTerm, nullTerm, resultCode, resultType)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
new file mode 100644
index 0000000..561dac7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/CallGenerator.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen.calls
+
+import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
+
+trait CallGenerator {
+
+  def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
new file mode 100644
index 0000000..f5ab6bf
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGenerator.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen.calls
+
+import java.lang.reflect.Method
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
+
+class MethodCallGenerator(returnType: TypeInformation[_], method: Method) 
extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(returnType)
+    val defaultValue = primitiveDefaultValue(returnType)
+
+    val resultCode = if (codeGenerator.nullCheck) {
+      s"""
+        |${operands.map(_.code).mkString("\n")}
+        |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
+        |$resultTypeTerm $resultTerm;
+        |if ($nullTerm) {
+        |  $resultTerm = $defaultValue;
+        |}
+        |else {
+        |  $resultTerm = 
${method.getDeclaringClass.getCanonicalName}.${method.getName}(
+        |    ${operands.map(_.resultTerm).mkString(", ")});
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operands.map(_.code).mkString("\n")}
+        |$resultTypeTerm $resultTerm = 
${method.getDeclaringClass.getCanonicalName}.
+        |  ${method.getName}(${operands.map(_.resultTerm).mkString(", ")});
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
new file mode 100644
index 0000000..8eda314
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen.calls
+
+import java.lang.reflect.Method
+
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.collection.mutable
+
+/**
+  * Global registry of built-in advanced SQL scalar functions.
+  */
+object ScalarFunctions {
+
+  private val sqlFunctions: mutable.Map[(SqlOperator, 
Seq[TypeInformation[_]]), CallGenerator] =
+    mutable.Map()
+
+  // 
----------------------------------------------------------------------------------------------
+  addSqlFunctionMethod(
+    SUBSTRING,
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethod.SUBSTRING.method)
+
+  addSqlFunctionMethod(
+    SUBSTRING,
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethod.SUBSTRING.method)
+
+  // 
----------------------------------------------------------------------------------------------
+
+  def getCallGenerator(
+      call: SqlOperator,
+      operandTypes: Seq[TypeInformation[_]])
+    : Option[CallGenerator] = {
+    sqlFunctions.get((call, operandTypes))
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  private def addSqlFunctionMethod(
+      sqlOperator: SqlOperator,
+      operandTypes: Seq[TypeInformation[_]],
+      returnType: TypeInformation[_],
+      method: Method)
+    : Unit = {
+    sqlFunctions((sqlOperator, operandTypes)) = new 
MethodCallGenerator(returnType, method)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
new file mode 100644
index 0000000..8580b25
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo, 
TypeInformation}
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.{CodeGenException, 
GeneratedExpression}
+
+object ScalarOperators {
+
+  def generateArithmeticOperator(
+      operator: String,
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    // String arithmetic // TODO rework
+    if (isString(left)) {
+      generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+      (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+      }
+    }
+    // Numeric arithmetic
+    else if (isNumeric(left) && isNumeric(right)) {
+      val leftType = left.resultType.asInstanceOf[NumericTypeInfo[_]]
+      val rightType = right.resultType.asInstanceOf[NumericTypeInfo[_]]
+      val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+
+      generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+      (leftTerm, rightTerm) =>
+        // no casting required
+        if (leftType == resultType && rightType == resultType) {
+          s"$leftTerm $operator $rightTerm"
+        }
+        // left needs casting
+        else if (leftType != resultType && rightType == resultType) {
+          s"(($resultTypeTerm) $leftTerm) $operator $rightTerm"
+        }
+        // right needs casting
+        else if (leftType == resultType && rightType != resultType) {
+          s"$leftTerm $operator (($resultTypeTerm) $rightTerm)"
+        }
+        // both sides need casting
+        else {
+          s"(($resultTypeTerm) $leftTerm) $operator (($resultTypeTerm) 
$rightTerm)"
+        }
+      }
+    }
+    else {
+      throw new CodeGenException("Unsupported arithmetic operation.")
+    }
+  }
+
+  def generateUnaryArithmeticOperator(
+      operator: String,
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) {
+      (operandTerm) => s"$operator($operandTerm)"
+    }
+  }
+
+  def generateEquals(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+      if (isReference(left)) {
+        (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
+      }
+      else if (isReference(right)) {
+        (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)"
+      }
+      else {
+        (leftTerm, rightTerm) => s"$leftTerm == $rightTerm"
+      }
+    }
+  }
+
+  def generateNotEquals(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+      if (isReference(left)) {
+        (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
+      }
+      else if (isReference(right)) {
+        (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))"
+      }
+      else {
+        (leftTerm, rightTerm) => s"$leftTerm != $rightTerm"
+      }
+    }
+  }
+
+  def generateComparison(
+      operator: String,
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+      if (isString(left) && isString(right)) {
+        (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0"
+      }
+      else if (isNumeric(left) && isNumeric(right)) {
+        (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+      }
+      else {
+        throw new CodeGenException("Comparison is only supported for Strings 
and numeric types.")
+      }
+    }
+  }
+
+  def generateIsNull(
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val operatorCode = if (nullCheck) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = ${operand.nullTerm};
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else if (!nullCheck && isReference(operand.resultType)) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = ${operand.resultTerm} == null;
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = false;
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateIsNotNull(
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val operatorCode = if (nullCheck) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = !${operand.nullTerm};
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else if (!nullCheck && isReference(operand.resultType)) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = ${operand.resultTerm} != null;
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = true;
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateAnd(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+
+    val operatorCode = if (nullCheck) {
+      // Three-valued logic:
+      // no Unknown -> Two-valued logic
+      // True && Unknown -> Unknown
+      // False && Unknown -> False
+      // Unknown && True -> Unknown
+      // Unknown && False -> False
+      // Unknown && Unknown -> Unknown
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm;
+        |boolean $nullTerm;
+        |if (!${left.nullTerm} && !${right.nullTerm}) {
+        |  $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+        |  $nullTerm = false;
+        |}
+        |else if (!${left.nullTerm} && ${left.resultTerm} && 
${right.nullTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else if (!${left.nullTerm} && !${left.resultTerm} && 
${right.nullTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = false;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && 
${right.resultTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && 
!${right.resultTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = false;
+        |}
+        |else {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateOr(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+
+    val operatorCode = if (nullCheck) {
+      // Three-valued logic:
+      // no Unknown -> Two-valued logic
+      // True && Unknown -> True
+      // False && Unknown -> Unknown
+      // Unknown && True -> True
+      // Unknown && False -> Unknown
+      // Unknown && Unknown -> Unknown
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm;
+        |boolean $nullTerm;
+        |if (!${left.nullTerm} && !${right.nullTerm}) {
+        |  $resultTerm = ${left.resultTerm} || ${right.resultTerm};
+        |  $nullTerm = false;
+        |}
+        |else if (!${left.nullTerm} && ${left.resultTerm} && 
${right.nullTerm}) {
+        |  $resultTerm = true;
+        |  $nullTerm = false;
+        |}
+        |else if (!${left.nullTerm} && !${left.resultTerm} && 
${right.nullTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && 
${right.resultTerm}) {
+        |  $resultTerm = true;
+        |  $nullTerm = false;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && 
!${right.resultTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateNot(
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    // Three-valued logic:
+    // no Unknown -> Two-valued logic
+    // Unknown -> Unknown
+    generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) {
+      (operandTerm) => s"!($operandTerm)"
+    }
+  }
+
+  def generateCast(
+      nullCheck: Boolean,
+      operand: GeneratedExpression,
+      targetType: TypeInformation[_])
+    : GeneratedExpression = {
+    targetType match {
+      // identity casting
+      case operand.resultType =>
+        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+          (operandTerm) => s"$operandTerm"
+        }
+
+      // * -> String
+      case STRING_TYPE_INFO =>
+        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+          (operandTerm) => s""" "" + $operandTerm"""
+        }
+
+      // * -> Date
+      case DATE_TYPE_INFO =>
+        throw new CodeGenException("Date type not supported yet.")
+
+      // * -> Void
+      case VOID_TYPE_INFO =>
+        throw new CodeGenException("Void type not supported.")
+
+      // * -> Character
+      case CHAR_TYPE_INFO =>
+        throw new CodeGenException("Character type not supported.")
+
+      // NUMERIC TYPE -> Boolean
+      case BOOLEAN_TYPE_INFO if isNumeric(operand) =>
+        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+          (operandTerm) => s"$operandTerm != 0"
+        }
+
+      // String -> BASIC TYPE (not String, Date, Void, Character)
+      case ti: BasicTypeInfo[_] if isString(operand) =>
+        val wrapperClass = targetType.getTypeClass.getCanonicalName
+        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+          (operandTerm) => s"$wrapperClass.valueOf($operandTerm)"
+        }
+
+      // NUMERIC TYPE -> NUMERIC TYPE
+      case nti: NumericTypeInfo[_] if isNumeric(operand) =>
+        val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
+        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+          (operandTerm) => s"($targetTypeTerm) $operandTerm"
+        }
+
+      // Boolean -> NUMERIC TYPE
+      case nti: NumericTypeInfo[_] if isBoolean(operand) =>
+        val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
+        generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+          (operandTerm) => s"($targetTypeTerm) ($operandTerm ? 1 : 0)"
+        }
+
+      case _ =>
+        throw new CodeGenException(s"Unsupported cast from 
'${operand.resultType}'" +
+          s"to '$targetType'.")
+    }
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  private def generateUnaryOperatorIfNotNull(
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      operand: GeneratedExpression)
+      (expr: (String) => String)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+    val defaultValue = primitiveDefaultValue(resultType)
+
+    val operatorCode = if (nullCheck) {
+      s"""
+        |${operand.code}
+        |$resultTypeTerm $resultTerm;
+        |boolean $nullTerm;
+        |if (!${operand.nullTerm}) {
+        |  $resultTerm = ${expr(operand.resultTerm)};
+        |  $nullTerm = false;
+        |}
+        |else {
+        |  $resultTerm = $defaultValue;
+        |  $nullTerm = true;
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operand.code}
+        |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
+  }
+
+  private def generateOperatorIfNotNull(
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+      (expr: (String, String) => String)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+    val defaultValue = primitiveDefaultValue(resultType)
+
+    val resultCode = if (nullCheck) {
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm};
+        |$resultTypeTerm $resultTerm;
+        |if ($nullTerm) {
+        |  $resultTerm = $defaultValue;
+        |}
+        |else {
+        |  $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${left.code}
+        |${right.code}
+        |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, 
right.resultTerm)};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, resultCode, resultType)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
index a39d601..bf551dd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
IntegerTypeInfo}
 case class Substring(
     str: Expression,
     beginIndex: Expression,
-    endIndex: Expression) extends Expression {
+    endIndex: Option[Expression] = None) extends Expression {
   def typeInfo = {
     if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
       throw new ExpressionException(
@@ -33,14 +33,23 @@ case class Substring(
       throw new ExpressionException(
         s"""Begin index must be an integer type in $this, is 
${beginIndex.typeInfo}.""")
     }
-    if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""End index must be an integer type in $this, is 
${endIndex.typeInfo}.""")
+    endIndex match {
+      case Some(endIdx) if !endIdx.typeInfo.isInstanceOf[IntegerTypeInfo[_]] =>
+        throw new ExpressionException(
+            s"""End index must be an integer type in $this, is 
${endIdx.typeInfo}.""")
+        case _ => // ok
     }
 
     BasicTypeInfo.STRING_TYPE_INFO
   }
 
-  override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
-  override def toString = s"($str).substring($beginIndex, $endIndex)"
+  override def children: Seq[Expression] = endIndex match {
+    case Some(endIdx) => Seq(str, beginIndex, endIdx)
+    case None => Seq(str, beginIndex)
+  }
+
+  override def toString = endIndex match {
+    case Some(endIdx) => s"($str).substring($beginIndex, $endIndex)"
+    case None => s"($str).substring($beginIndex)"
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
index d082c7e..ebbbf8b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -125,13 +125,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val substring: PackratParser[Expression] =
     atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ {
-      case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to)
+      case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, Some(to))
 
     }
 
   lazy val substringWithoutEnd: PackratParser[Expression] =
     atom ~ ".substring(" ~ expression ~ ")" ^^ {
-      case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE))
+      case e ~ _ ~ from ~ _ => Substring(e, from)
 
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index bad111f..fb9d057 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -52,17 +52,25 @@ object RexNodeTranslator {
         val l = extractAggCalls(b.left, relBuilder)
         val r = extractAggCalls(b.right, relBuilder)
         (b.makeCopy(List(l._1, r._1)), l._2 ::: r._2)
-      case s: Substring =>
+
+      // Scalar functions
+      case s@Substring(_, _, Some(endIndex)) =>
         val str = extractAggCalls(s.str, relBuilder)
         val sta = extractAggCalls(s.beginIndex, relBuilder)
-        val end = extractAggCalls(s.endIndex, relBuilder)
+        val end = extractAggCalls(endIndex, relBuilder)
         (s.makeCopy(
-          List(str._1, sta._1, end._1)),
+          List(str._1, sta._1, Some(end._1))),
           (str._2 ::: sta._2) ::: end._2
         )
-      case e@_ =>
+
+      case s@Substring(_, _, None) =>
+        val str = extractAggCalls(s.str, relBuilder)
+        val sta = extractAggCalls(s.beginIndex, relBuilder)
+        (s.makeCopy(List(str._1, sta._1, None)), str._2 ::: sta._2)
+
+      case e@AnyRef =>
         throw new IllegalArgumentException(
-          s"Expression ${e} of type ${e.getClass()} not supported yet")
+          s"Expression $e of type ${e.getClass} not supported yet")
     }
   }
 
@@ -72,6 +80,7 @@ object RexNodeTranslator {
   def toRexNode(exp: Expression, relBuilder: RelBuilder): RexNode = {
 
     exp match {
+      // Basic operators
       case Literal(value, tpe) =>
         relBuilder.literal(value)
       case ResolvedFieldReference(name, tpe) =>
@@ -151,16 +160,24 @@ object RexNodeTranslator {
       case UnaryMinus(child) =>
         val c = toRexNode(child, relBuilder)
         relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, c)
-      case Substring(string, start, end) =>
+
+      // Scalar functions
+      case Substring(string, start, Some(end)) =>
         val str = toRexNode(string, relBuilder)
         val sta = toRexNode(start, relBuilder)
         val en = toRexNode(end, relBuilder)
         relBuilder.call(SqlStdOperatorTable.SUBSTRING, str, sta, en)
+
+      case Substring(string, start, None) =>
+        val str = toRexNode(string, relBuilder)
+        val sta = toRexNode(start, relBuilder)
+        relBuilder.call(SqlStdOperatorTable.SUBSTRING, str, sta)
+
       case a: Aggregation =>
-        throw new IllegalArgumentException(s"Aggregation expression ${a} not 
allowed at this place")
-      case e@_ =>
+        throw new IllegalArgumentException(s"Aggregation expression $a not 
allowed at this place")
+      case e@AnyRef =>
         throw new IllegalArgumentException(
-          s"Expression ${e} of type ${e.getClass()} not supported yet")
+          s"Expression $e of type ${e.getClass} not supported yet")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
index e1c9e96..628cbef 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -25,13 +25,11 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.codegen.CodeGenException;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
-import org.apache.flink.api.table.test.TableProgramsTestBase;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
index e009cb8..5b22574 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.codegen.CodeGenException;
-import org.apache.flink.api.table.test.TableProgramsTestBase;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
index 222f161..8c30163 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.test.TableProgramsTestBase;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
index b8ca4cd..c69d1a7 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.test.TableProgramsTestBase;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Test;
 import org.junit.runner.RunWith;

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index c4ac138..3ce9891 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.test.TableProgramsTestBase;
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Test;
 import org.junit.runner.RunWith;

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index 315fe9f..6b8f984 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -40,7 +40,7 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Test(expected = CodeGenException.class)
+       @Test
        public void testSubstring() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
@@ -52,7 +52,7 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                Table in = tableEnv.fromDataSet(ds, "a, b");
 
                Table result = in
-                               .select("a.substring(0, b)");
+                               .select("a.substring(1, b)");
 
                DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
                List<Row> results = resultSet.collect();
@@ -60,14 +60,14 @@ public class StringExpressionsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Test(expected = CodeGenException.class)
+       @Test
        public void testSubstringWithMaxEnd() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();
 
                DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-                               new Tuple2<>("ABCD", 2),
-                               new Tuple2<>("ABCD", 1));
+                               new Tuple2<>("ABCD", 3),
+                               new Tuple2<>("ABCD", 2));
 
                Table in = tableEnv.fromDataSet(ds, "a, b");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
index 54580c4..f2120ef 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala
@@ -22,8 +22,9 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.test.TableProgramsTestBase
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index 8ab44a3..ba0311a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -25,8 +25,9 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.Row
 import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.test.TableProgramsTestBase
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 96b9341..c73d7eb 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -23,8 +23,9 @@ import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.Row
 import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.test.TableProgramsTestBase
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
index e181e0b..4a78079 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala
@@ -22,8 +22,9 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.test.TableProgramsTestBase
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
index f2e9aca..c4fc346 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala
@@ -18,35 +18,35 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
-import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.api.table.Row
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+
 import scala.collection.JavaConverters._
-import org.apache.flink.api.table.codegen.CodeGenException
 
 @RunWith(classOf[Parameterized])
 class StringExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
 
-  @Test(expected = classOf[CodeGenException])
+  @Test
   def testSubstring(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
-      .select('a.substring(0, 'b))
+      .select('a.substring(1, 'b))
 
     val expected = "AA\nB"
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Test
   def testSubstringWithMaxEnd(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val t = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
+    val t = env.fromElements(("ABCD", 3), ("ABCD", 2)).as('a, 'b)
       .select('a.substring('b))
 
     val expected = "CD\nBCD"

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
new file mode 100644
index 0000000..9989c1a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.test
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.parser.ExpressionParser
+import org.apache.flink.api.table.test.utils.ExpressionEvaluator
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class ScalarFunctionsTest {
+
+  @Test
+  def testSubstring(): Unit = {
+    testFunction(
+      'f0.substring(2),
+      "f0.substring(2)",
+      "SUBSTRING(f0, 2)",
+      "his is a test String.")
+
+    testFunction(
+      'f0.substring(2, 5),
+      "f0.substring(2, 5)",
+      "SUBSTRING(f0, 2, 5)",
+      "his i")
+
+    testFunction(
+      'f0.substring(1, 'f7),
+      "f0.substring(1, f7)",
+      "SUBSTRING(f0, 1, f7)",
+      "Thi")
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  def testFunction(
+      expr: Expression,
+      exprString: String,
+      sqlExpr: String,
+      expected: String): Unit = {
+    val testData = new Row(8)
+    testData.setField(0, "This is a test String.")
+    testData.setField(1, true)
+    testData.setField(2, 42.toByte)
+    testData.setField(3, 43.toShort)
+    testData.setField(4, 44.toLong)
+    testData.setField(5, 4.5.toFloat)
+    testData.setField(6, 4.6)
+    testData.setField(7, 3)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      STRING_TYPE_INFO,
+      BOOLEAN_TYPE_INFO,
+      BYTE_TYPE_INFO,
+      SHORT_TYPE_INFO,
+      LONG_TYPE_INFO,
+      FLOAT_TYPE_INFO,
+      DOUBLE_TYPE_INFO,
+      INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
+
+    val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr)
+    assertEquals(expected, exprResult)
+
+    val exprStringResult = ExpressionEvaluator.evaluate(
+      testData,
+      typeInfo,
+      ExpressionParser.parseExpression(exprString))
+    assertEquals(expected, exprStringResult)
+
+    // TODO test SQL expression
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
deleted file mode 100644
index 0962f4e..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.test
-
-import java.util
-
-import org.apache.flink.api.java.table.{TableEnvironment => JavaTableEnv}
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv}
-import org.apache.flink.api.scala.table.{TableEnvironment => ScalaTableEnv}
-import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv}
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode
-import 
org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode.{EFFICIENT,
 NULL}
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
-
-class TableProgramsTestBase(
-    mode: TestExecutionMode,
-    tableConfigMode: TableConfigMode)
-  extends MultipleProgramsTestBase(mode) {
-
-  def getJavaTableEnvironment: JavaTableEnv = {
-    val env = JavaEnv.getExecutionEnvironment // TODO pass it to tableEnv
-    val tableEnv = new JavaTableEnv
-    configure(tableEnv.getConfig)
-    tableEnv
-  }
-
-  def getScalaTableEnvironment: ScalaTableEnv = {
-    val env = ScalaEnv.getExecutionEnvironment // TODO pass it to tableEnv
-    val tableEnv = new ScalaTableEnv
-    configure(tableEnv.getConfig)
-    tableEnv
-  }
-
-  def getConfig: TableConfig = {
-    val config = new TableConfig()
-    configure(config)
-    config
-  }
-
-  def configure(config: TableConfig): Unit = {
-    tableConfigMode match {
-      case NULL =>
-        config.setNullCheck(true)
-      case EFFICIENT =>
-        config.setEfficientTypeUsage(true)
-      case _ => // keep default
-    }
-  }
-
-}
-
-object TableProgramsTestBase {
-  sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: 
Boolean }
-  object TableConfigMode {
-    case object DEFAULT extends TableConfigMode {
-      val nullCheck = false; val efficientTypes = false
-    }
-    case object NULL extends TableConfigMode {
-      val nullCheck = true; val efficientTypes = false
-    }
-    case object EFFICIENT extends TableConfigMode {
-      val nullCheck = false; val efficientTypes = true
-    }
-  }
-
-  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
-  def tableConfigs(): util.Collection[Array[java.lang.Object]] = {
-    Seq(
-      // TODO more tests in cluster mode?
-      Array[AnyRef](TestExecutionMode.CLUSTER, TableConfigMode.DEFAULT),
-      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT),
-      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.NULL),
-      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.EFFICIENT)
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
new file mode 100644
index 0000000..b06ac26
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.test.utils
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.functions.{Function, MapFunction}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.plan.{RexNodeTranslator, TranslationContext}
+import org.apache.flink.api.table.plan.schema.DataSetTable
+import org.apache.flink.api.table.runtime.FunctionCompiler
+import org.mockito.Mockito._
+
+/**
+  * Utility to translate and evaluate an RexNode or Table API expression to a 
String.
+  */
+object ExpressionEvaluator {
+
+  // TestCompiler that uses current class loader
+  class TestCompiler[T <: Function] extends FunctionCompiler[T] {
+    def compile(genFunc: GeneratedFunction[T]): Class[T] =
+      compile(getClass.getClassLoader, genFunc.name, genFunc.code)
+  }
+
+  private def prepareRelBuilder(typeInfo: TypeInformation[Any]): RelBuilder = {
+    // create DataSetTable
+    val dataSetMock = mock(classOf[DataSet[Any]])
+    when(dataSetMock.getType).thenReturn(typeInfo)
+    val tableName = TranslationContext.addDataSet(new DataSetTable[Any](
+      dataSetMock,
+      (0 until typeInfo.getArity).toArray,
+      (0 until typeInfo.getArity).map("f" + _).toArray))
+
+    // prepare RelBuilder
+    val relBuilder = TranslationContext.getRelBuilder
+    relBuilder.scan(tableName)
+    relBuilder
+  }
+
+  def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): 
String = {
+    val relBuilder = prepareRelBuilder(typeInfo)
+    evaluate(data, typeInfo, relBuilder, RexNodeTranslator.toRexNode(expr, 
relBuilder))
+  }
+
+  def evaluate(
+      data: Any,
+      typeInfo: TypeInformation[Any],
+      relBuilder: RelBuilder,
+      rexNode: RexNode): String = {
+    // generate code for Mapper
+    val config = new TableConfig()
+    val generator = new CodeGenerator(config, typeInfo)
+    val genExpr = generator.generateExpression(relBuilder.cast(rexNode, 
VARCHAR)) // cast to String
+    val bodyCode =
+      s"""
+        |${genExpr.code}
+        |return ${genExpr.resultTerm};
+        |""".stripMargin
+    val genFunc = generator.generateFunction[MapFunction[Any, String]](
+      "TestFunction",
+      classOf[MapFunction[Any, String]],
+      bodyCode,
+      STRING_TYPE_INFO.asInstanceOf[TypeInformation[Any]])
+
+    // compile and evaluate
+    val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc)
+    val mapper = clazz.newInstance()
+    mapper.map(data)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e220745/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
new file mode 100644
index 0000000..78be519
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/TableProgramsTestBase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.test.utils
+
+import java.util
+
+import org.apache.flink.api.java.table.{TableEnvironment => JavaTableEnv}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv}
+import org.apache.flink.api.scala.table.{TableEnvironment => ScalaTableEnv}
+import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv}
+import org.apache.flink.api.table.TableConfig
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode.{EFFICIENT,
 NULL}
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
+
+class TableProgramsTestBase(
+    mode: TestExecutionMode,
+    tableConfigMode: TableConfigMode)
+  extends MultipleProgramsTestBase(mode) {
+
+  def getJavaTableEnvironment: JavaTableEnv = {
+    val env = JavaEnv.getExecutionEnvironment // TODO pass it to tableEnv
+    val tableEnv = new JavaTableEnv
+    configure(tableEnv.getConfig)
+    tableEnv
+  }
+
+  def getScalaTableEnvironment: ScalaTableEnv = {
+    val env = ScalaEnv.getExecutionEnvironment // TODO pass it to tableEnv
+    val tableEnv = new ScalaTableEnv
+    configure(tableEnv.getConfig)
+    tableEnv
+  }
+
+  def getConfig: TableConfig = {
+    val config = new TableConfig()
+    configure(config)
+    config
+  }
+
+  def configure(config: TableConfig): Unit = {
+    tableConfigMode match {
+      case NULL =>
+        config.setNullCheck(true)
+      case EFFICIENT =>
+        config.setEfficientTypeUsage(true)
+      case _ => // keep default
+    }
+  }
+
+}
+
+object TableProgramsTestBase {
+  sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: 
Boolean }
+  object TableConfigMode {
+    case object DEFAULT extends TableConfigMode {
+      val nullCheck = false; val efficientTypes = false
+    }
+    case object NULL extends TableConfigMode {
+      val nullCheck = true; val efficientTypes = false
+    }
+    case object EFFICIENT extends TableConfigMode {
+      val nullCheck = false; val efficientTypes = true
+    }
+  }
+
+  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  def tableConfigs(): util.Collection[Array[java.lang.Object]] = {
+    Seq(
+      // TODO more tests in cluster mode?
+      Array[AnyRef](TestExecutionMode.CLUSTER, TableConfigMode.DEFAULT),
+      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT),
+      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.NULL),
+      Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.EFFICIENT)
+    )
+  }
+}

Reply via email to