This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bc6c2cec37c [FLINK-33663] Serialize CallExpressions into SQL (#23811)
bc6c2cec37c is described below

commit bc6c2cec37c45f021ae22a2a7b5ab9537b8506cd
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue Dec 5 09:46:45 2023 +0100

    [FLINK-33663] Serialize CallExpressions into SQL (#23811)
---
 .../expressions/ExpressionSerializationTest.java   | 364 +++++++++++++++++++++
 .../flink/table/expressions/CallExpression.java    |  27 ++
 .../expressions/FieldReferenceExpression.java      |   6 +
 .../table/expressions/TypeLiteralExpression.java   |  10 +
 .../table/functions/BuiltInFunctionDefinition.java |  55 ++++
 .../functions/BuiltInFunctionDefinitions.java      | 135 +++++++-
 .../flink/table/functions/CallSyntaxUtils.java     |  49 +++
 .../table/functions/JsonFunctionsCallSyntax.java   | 185 +++++++++++
 .../flink/table/functions/SqlCallSyntax.java       | 275 ++++++++++++++++
 .../BuiltInAggregateFunctionTestBase.java          | 185 ++++++++++-
 .../planner/functions/BuiltInFunctionTestBase.java |  35 +-
 .../functions/IfThenElseFunctionITCase.java        |  55 ++++
 .../functions/JsonAggregationFunctionsITCase.java  |  67 ++--
 13 files changed, 1396 insertions(+), 52 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
new file mode 100644
index 00000000000..2693cb38517
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/ExpressionSerializationTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.JsonExistsOnError;
+import org.apache.flink.table.api.JsonOnNull;
+import org.apache.flink.table.api.JsonQueryOnEmptyOrError;
+import org.apache.flink.table.api.JsonQueryWrapper;
+import org.apache.flink.table.api.JsonType;
+import org.apache.flink.table.api.JsonValueOnEmptyOrError;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.operations.ValuesQueryOperation;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
+import org.apache.flink.table.utils.FunctionLookupMock;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for serializing {@link BuiltInFunctionDefinitions} into a SQL 
string. */
+public class ExpressionSerializationTest {
+
+    public static Stream<TestSpec> testData() {
+        return Stream.of(
+                TestSpec.forExpr(Expressions.uuid()).expectStr("UUID()"),
+                TestSpec.forExpr($("f0").abs())
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("ABS(`f0`)"),
+                TestSpec.forExpr($("f0").isLess(123))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("`f0` < 123"),
+                TestSpec.forExpr($("f0").isLessOrEqual(123))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("`f0` <= 123"),
+                TestSpec.forExpr($("f0").isEqual(123))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("`f0` = 123"),
+                TestSpec.forExpr($("f0").isNotEqual(123))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("`f0` <> 123"),
+                TestSpec.forExpr($("f0").isGreaterOrEqual(123))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("`f0` >= 123"),
+                TestSpec.forExpr($("f0").isGreater(123))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("`f0` > 123"),
+                TestSpec.forExpr($("f0").isNull())
+                        .withField("f0", DataTypes.BOOLEAN())
+                        .expectStr("`f0` IS NULL"),
+                TestSpec.forExpr($("f0").isNotNull())
+                        .withField("f0", DataTypes.BOOLEAN())
+                        .expectStr("`f0` IS NOT NULL"),
+                TestSpec.forExpr($("f0").isTrue())
+                        .withField("f0", DataTypes.BOOLEAN())
+                        .expectStr("`f0` IS TRUE"),
+                TestSpec.forExpr($("f0").isNotTrue())
+                        .withField("f0", DataTypes.BOOLEAN())
+                        .expectStr("`f0` IS NOT TRUE"),
+                TestSpec.forExpr($("f0").isFalse())
+                        .withField("f0", DataTypes.BOOLEAN())
+                        .expectStr("`f0` IS FALSE"),
+                TestSpec.forExpr($("f0").isNotFalse())
+                        .withField("f0", DataTypes.BOOLEAN())
+                        .expectStr("`f0` IS NOT FALSE"),
+                TestSpec.forExpr($("f0").not())
+                        .withField("f0", DataTypes.BOOLEAN())
+                        .expectStr("NOT `f0`"),
+                TestSpec.forExpr(
+                                Expressions.and(
+                                        $("f0").isNotNull(),
+                                        $("f0").isLess(420),
+                                        $("f0").isGreater(123)))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("(`f0` IS NOT NULL) AND (`f0` < 420) AND 
(`f0` > 123)"),
+                TestSpec.forExpr(
+                                Expressions.or(
+                                        $("f0").isNotNull(),
+                                        $("f0").isLess(420),
+                                        $("f0").isGreater(123)))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("(`f0` IS NOT NULL) OR (`f0` < 420) OR 
(`f0` > 123)"),
+                TestSpec.forExpr(
+                                Expressions.ifThenElse(
+                                        $("f0").isNotNull(), 
$("f0").plus(420), $("f0").minus(123)))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr(
+                                "CASE WHEN `f0` IS NOT NULL THEN `f0` + 420 
ELSE `f0` - 123 END"),
+                TestSpec.forExpr($("f0").times(3).dividedBy($("f1")))
+                        .withField("f0", DataTypes.BIGINT())
+                        .withField("f1", DataTypes.BIGINT())
+                        .expectStr("(`f0` * 3) / `f1`"),
+                TestSpec.forExpr($("f0").mod(5))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("`f0` % 5"),
+                TestSpec.forExpr(Expressions.negative($("f0")))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("- `f0`"),
+                TestSpec.forExpr($("f0").in(1, 2, 3, 4, 5))
+                        .withField("f0", DataTypes.INT())
+                        .expectStr("`f0` IN (1, 2, 3, 4, 5)"),
+                TestSpec.forExpr($("f0").cast(DataTypes.SMALLINT()))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("CAST(`f0` AS SMALLINT)"),
+                TestSpec.forExpr($("f0").tryCast(DataTypes.SMALLINT()))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("TRY_CAST(`f0` AS SMALLINT)"),
+                TestSpec.forExpr(Expressions.array($("f0"), $("f1"), "ABC"))
+                        .withField("f0", DataTypes.STRING())
+                        .withField("f1", DataTypes.STRING())
+                        .expectStr("ARRAY[`f0`, `f1`, 'ABC']"),
+                TestSpec.forExpr(Expressions.map($("f0"), $("f1"), "ABC", 
"DEF"))
+                        .withField("f0", DataTypes.STRING())
+                        .withField("f1", DataTypes.STRING())
+                        .expectStr("MAP[`f0`, `f1`, 'ABC', 'DEF']"),
+                TestSpec.forExpr($("f0").at(2))
+                        .withField("f0", DataTypes.ARRAY(DataTypes.STRING()))
+                        .expectStr("`f0`[2]"),
+                TestSpec.forExpr($("f0").at("abc"))
+                        .withField("f0", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.BIGINT()))
+                        .expectStr("`f0`['abc']"),
+                TestSpec.forExpr($("f0").between(1, 10))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("`f0` BETWEEN 1 AND 10"),
+                TestSpec.forExpr($("f0").notBetween(1, 10))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("`f0` NOT BETWEEN 1 AND 10"),
+                TestSpec.forExpr($("f0").like("ABC"))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("`f0` LIKE 'ABC'"),
+                TestSpec.forExpr($("f0").similar("ABC"))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("`f0` SIMILAR TO 'ABC'"),
+                TestSpec.forExpr($("f0").position("ABC"))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("POSITION(`f0` IN 'ABC')"),
+                TestSpec.forExpr($("f0").trim("ABC"))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("TRIM BOTH 'ABC' FROM `f0`"),
+                TestSpec.forExpr($("f0").trimLeading("ABC"))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("TRIM LEADING 'ABC' FROM `f0`"),
+                TestSpec.forExpr($("f0").trimTrailing("ABC"))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("TRIM TRAILING 'ABC' FROM `f0`"),
+                TestSpec.forExpr($("f0").overlay("ABC", 2))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("OVERLAY(`f0` PLACING 'ABC' FROM 2)"),
+                TestSpec.forExpr($("f0").overlay("ABC", 2, 5))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("OVERLAY(`f0` PLACING 'ABC' FROM 2 FOR 5)"),
+                TestSpec.forExpr($("f0").substr(2))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("SUBSTR(`f0` FROM 2)"),
+                TestSpec.forExpr($("f0").substr(2, 5))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("SUBSTR(`f0` FROM 2 FOR 5)"),
+                TestSpec.forExpr($("f0").substring(2))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("SUBSTRING(`f0` FROM 2)"),
+                TestSpec.forExpr($("f0").substring(2, 5))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("SUBSTRING(`f0` FROM 2 FOR 5)"),
+                TestSpec.forExpr($("f0").extract(TimeIntervalUnit.HOUR))
+                        .withField("f0", DataTypes.TIMESTAMP())
+                        .expectStr("EXTRACT(HOUR FROM `f0`)"),
+                TestSpec.forExpr($("f0").floor(TimeIntervalUnit.HOUR))
+                        .withField("f0", DataTypes.TIMESTAMP())
+                        .expectStr("FLOOR(`f0` TO HOUR)"),
+                TestSpec.forExpr($("f0").ceil(TimeIntervalUnit.HOUR))
+                        .withField("f0", DataTypes.TIMESTAMP())
+                        .expectStr("CEIL(`f0` TO HOUR)"),
+                TestSpec.forExpr(
+                                Expressions.temporalOverlaps(
+                                        $("f0"), $("f1"),
+                                        $("f2"), $("f3")))
+                        .withField("f0", DataTypes.TIMESTAMP())
+                        .withField("f1", DataTypes.TIMESTAMP())
+                        .withField("f2", DataTypes.TIMESTAMP())
+                        .withField("f3", DataTypes.TIMESTAMP())
+                        .expectStr("(`f0`, `f1`) OVERLAPS (`f2`, `f3`)"),
+                
TestSpec.forExpr($("f0").get("g0").plus($("f0").get("g1").get("h1")))
+                        .withField(
+                                "f0",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("g0", 
DataTypes.BIGINT()),
+                                        DataTypes.FIELD(
+                                                "g1",
+                                                DataTypes.ROW(
+                                                        DataTypes.FIELD(
+                                                                "h1", 
DataTypes.BIGINT())))))
+                        .expectStr("(`f0`.`g0`) + (`f0`.`g1`.`h1`)"),
+                TestSpec.forExpr($("f0").abs().as("absolute`F0"))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr("(ABS(`f0`)) AS `absolute``F0`"),
+
+                // JSON functions
+                TestSpec.forExpr($("f0").isJson())
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("`f0` IS JSON"),
+                TestSpec.forExpr($("f0").isJson(JsonType.SCALAR))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("`f0` IS JSON SCALAR"),
+                TestSpec.forExpr($("f0").jsonExists("$.a"))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("JSON_EXISTS(`f0`, '$.a')"),
+                TestSpec.forExpr($("f0").jsonExists("$.a", 
JsonExistsOnError.UNKNOWN))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("JSON_EXISTS(`f0`, '$.a' UNKNOWN ON 
ERROR)"),
+                TestSpec.forExpr($("f0").jsonValue("$.a"))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr(
+                                "JSON_VALUE(`f0`, '$.a' RETURNING 
VARCHAR(2147483647) NULL ON EMPTY NULL ON ERROR)"),
+                TestSpec.forExpr($("f0").jsonValue("$.a", DataTypes.BOOLEAN(), 
false))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr(
+                                "JSON_VALUE(`f0`, '$.a' RETURNING BOOLEAN 
DEFAULT FALSE ON EMPTY DEFAULT FALSE ON ERROR)"),
+                TestSpec.forExpr(
+                                $("f0").jsonValue(
+                                                "$.a",
+                                                DataTypes.BIGINT(),
+                                                
JsonValueOnEmptyOrError.DEFAULT,
+                                                1,
+                                                JsonValueOnEmptyOrError.ERROR,
+                                                null))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr(
+                                "JSON_VALUE(`f0`, '$.a' RETURNING BIGINT 
DEFAULT 1 ON EMPTY ERROR ON ERROR)"),
+                TestSpec.forExpr($("f0").jsonQuery("$.a"))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr(
+                                "JSON_QUERY(`f0`, '$.a' WITHOUT ARRAY WRAPPER 
NULL ON EMPTY NULL ON ERROR)"),
+                TestSpec.forExpr($("f0").jsonQuery("$.a", 
JsonQueryWrapper.UNCONDITIONAL_ARRAY))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr(
+                                "JSON_QUERY(`f0`, '$.a' WITH UNCONDITIONAL 
ARRAY WRAPPER NULL ON EMPTY NULL ON ERROR)"),
+                TestSpec.forExpr(
+                                $("f0").jsonQuery(
+                                                "$.a",
+                                                
JsonQueryWrapper.CONDITIONAL_ARRAY,
+                                                
JsonQueryOnEmptyOrError.EMPTY_OBJECT,
+                                                
JsonQueryOnEmptyOrError.EMPTY_ARRAY))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr(
+                                "JSON_QUERY(`f0`, '$.a' WITH CONDITIONAL ARRAY 
WRAPPER EMPTY OBJECT ON EMPTY EMPTY ARRAY ON ERROR)"),
+                TestSpec.forExpr(
+                                Expressions.jsonObject(JsonOnNull.ABSENT, 
"k1", $("f0"), "k2", 123))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr(
+                                "JSON_OBJECT(KEY 'k1' VALUE `f0`, KEY 'k2' 
VALUE 123 ABSENT ON NULL)"),
+                TestSpec.forExpr(Expressions.jsonArray(JsonOnNull.ABSENT, 
"k1", $("f0"), "k2"))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("JSON_ARRAY('k1', `f0`, 'k2' ABSENT ON 
NULL)"),
+                TestSpec.forExpr(Expressions.jsonArrayAgg(JsonOnNull.ABSENT, 
$("f0")))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("JSON_ARRAYAGG(`f0` ABSENT ON NULL)"),
+                TestSpec.forExpr(Expressions.jsonArrayAgg(JsonOnNull.NULL, 
$("f0")))
+                        .withField("f0", DataTypes.STRING())
+                        .expectStr("JSON_ARRAYAGG(`f0` NULL ON NULL)"),
+                TestSpec.forExpr(Expressions.jsonObjectAgg(JsonOnNull.ABSENT, 
$("f0"), $("f1")))
+                        .withField("f0", DataTypes.STRING())
+                        .withField("f1", DataTypes.STRING())
+                        .expectStr("JSON_OBJECTAGG(KEY `f0` VALUE `f1` ABSENT 
ON NULL)"),
+                TestSpec.forExpr(Expressions.jsonObjectAgg(JsonOnNull.NULL, 
$("f0"), $("f1")))
+                        .withField("f0", DataTypes.STRING())
+                        .withField("f1", DataTypes.STRING())
+                        .expectStr("JSON_OBJECTAGG(KEY `f0` VALUE `f1` NULL ON 
NULL)"),
+
+                // Aggregate functions
+                TestSpec.forExpr(
+                                $("f0").count()
+                                        .distinct()
+                                        .plus($("f0").avg().distinct())
+                                        .plus($("f0").max()))
+                        .withField("f0", DataTypes.BIGINT())
+                        .expectStr(
+                                "((COUNT(DISTINCT `f0`)) + (AVG(DISTINCT 
`f0`))) + (MAX(`f0`))"));
+    }
+
+    @ParameterizedTest
+    @MethodSource("testData")
+    void testSerialization(TestSpec spec) {
+        final List<ResolvedExpression> resolved =
+                ExpressionResolver.resolverFor(
+                                TableConfig.getDefault(),
+                                Thread.currentThread().getContextClassLoader(),
+                                name -> Optional.empty(),
+                                new FunctionLookupMock(Collections.emptyMap()),
+                                new DataTypeFactoryMock(),
+                                (sqlExpression, inputRowType, outputType) -> 
null,
+                                new ValuesQueryOperation(
+                                        Collections.emptyList(),
+                                        ResolvedSchema.of(new 
ArrayList<>(spec.columns.values()))))
+                        .build()
+                        .resolve(Collections.singletonList(spec.expr));
+
+        assertThat(resolved)
+                .hasSize(1)
+                .extracting(ResolvedExpression::asSerializableString)
+                .containsOnly(spec.expectedStr);
+    }
+
+    private static class TestSpec {
+        private final Expression expr;
+        private String expectedStr;
+
+        private final Map<String, Column> columns = new HashMap<>();
+
+        public TestSpec(Expression expr) {
+            this.expr = expr;
+        }
+
+        public static TestSpec forExpr(Expression expr) {
+            return new TestSpec(expr);
+        }
+
+        public TestSpec withField(String name, DataType dataType) {
+            this.columns.put(name, Column.physical(name, dataType));
+            return this;
+        }
+
+        public TestSpec expectStr(String expected) {
+            this.expectedStr = expected;
+            return this;
+        }
+
+        @Override
+        public String toString() {
+            return expr.asSummaryString();
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/CallExpression.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/CallExpression.java
index 24847ae143a..4465856977c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/CallExpression.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/CallExpression.java
@@ -20,13 +20,16 @@ package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.functions.SqlCallSyntax;
 import org.apache.flink.table.module.Module;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -216,6 +219,30 @@ public final class CallExpression implements 
ResolvedExpression {
         return getFunctionName() + argList;
     }
 
+    @Override
+    public String asSerializableString() {
+        if (functionDefinition instanceof BuiltInFunctionDefinition) {
+            final BuiltInFunctionDefinition definition =
+                    (BuiltInFunctionDefinition) functionDefinition;
+            return definition.getCallSyntax().unparse(definition.getSqlName(), 
args);
+        } else {
+            return 
SqlCallSyntax.FUNCTION.unparse(getSerializableFunctionName(), args);
+        }
+    }
+
+    private String getSerializableFunctionName() {
+        if (functionIdentifier == null) {
+            throw new TableException(
+                    "Only functions that have been registered before are 
serializable.");
+        }
+
+        return functionIdentifier
+                .getIdentifier()
+                .map(ObjectIdentifier::asSerializableString)
+                .orElseGet(
+                        () -> 
EncodingUtils.escapeIdentifier(functionIdentifier.getFunctionName()));
+    }
+
     @Override
     public List<Expression> getChildren() {
         return Collections.unmodifiableList(this.args);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
index 5afb57ac1a8..42f19ca3d66 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/FieldReferenceExpression.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
@@ -88,6 +89,11 @@ public final class FieldReferenceExpression implements 
ResolvedExpression {
         return name;
     }
 
+    @Override
+    public String asSerializableString() {
+        return EncodingUtils.escapeIdentifier(name);
+    }
+
     @Override
     public List<Expression> getChildren() {
         return Collections.emptyList();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/TypeLiteralExpression.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/TypeLiteralExpression.java
index c303736b42f..61c05ddb138 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/TypeLiteralExpression.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/TypeLiteralExpression.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
@@ -57,6 +58,15 @@ public final class TypeLiteralExpression implements 
ResolvedExpression {
         return dataType.toString();
     }
 
+    @Override
+    public String asSerializableString() {
+        // in SQL nullability is not part of the type, but it is an additional 
constraint
+        // on table columns, we remove the nullability here to be able to use 
the string
+        // representation in SQL such as e.g. CAST(f0 AS BIGINT)
+        final LogicalType logicalType = dataType.getLogicalType();
+        return logicalType.copy(true).asSerializableString();
+    }
+
     @Override
     public List<Expression> getChildren() {
         return Collections.emptyList();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
index 81241e2ecb7..fce8c4664fa 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
@@ -69,16 +69,23 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
 
     private final boolean isInternal;
 
+    private final SqlCallSyntax sqlCallSyntax;
+
+    private final String sqlName;
+
     private BuiltInFunctionDefinition(
             String name,
+            String sqlName,
             int version,
             FunctionKind kind,
             TypeInference typeInference,
+            SqlCallSyntax sqlCallSyntax,
             boolean isDeterministic,
             boolean isRuntimeProvided,
             String runtimeClass,
             boolean isInternal) {
         this.name = checkNotNull(name, "Name must not be null.");
+        this.sqlName = sqlName;
         this.version = isInternal ? null : version;
         this.kind = checkNotNull(kind, "Kind must not be null.");
         this.typeInference = checkNotNull(typeInference, "Type inference must 
not be null.");
@@ -86,6 +93,7 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
         this.isRuntimeProvided = isRuntimeProvided;
         this.runtimeClass = runtimeClass;
         this.isInternal = isInternal;
+        this.sqlCallSyntax = sqlCallSyntax;
         validateFunction(this.name, this.version, this.isInternal);
     }
 
@@ -98,6 +106,14 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
         return name;
     }
 
+    public String getSqlName() {
+        if (sqlName != null) {
+            return sqlName;
+        }
+
+        return getName().toUpperCase(Locale.ROOT);
+    }
+
     public Optional<Integer> getVersion() {
         return Optional.ofNullable(version);
     }
@@ -163,6 +179,10 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
         return typeInference;
     }
 
+    public SqlCallSyntax getCallSyntax() {
+        return sqlCallSyntax;
+    }
+
     @Override
     public boolean isDeterministic() {
         return isDeterministic;
@@ -214,6 +234,8 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
 
         private String name;
 
+        private String sqlName;
+
         private int version = DEFAULT_VERSION;
 
         private FunctionKind kind;
@@ -228,6 +250,8 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
 
         private boolean isInternal = false;
 
+        private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION;
+
         public Builder() {
             // default constructor to allow a fluent definition
         }
@@ -327,12 +351,43 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
             return this;
         }
 
+        /**
+         * Overwrites the syntax used for unparsing a function into a SQL 
string. If not specified,
+         * {@link SqlCallSyntax#FUNCTION} is used.
+         */
+        public Builder callSyntax(SqlCallSyntax syntax) {
+            this.sqlCallSyntax = syntax;
+            return this;
+        }
+
+        /**
+         * Overwrites the syntax used for unparsing a function into a SQL 
string. If not specified,
+         * {@link SqlCallSyntax#FUNCTION} is used. This method overwrites the 
name as well. If the
+         * name is not provided {@link #name(String)} is passed to the {@link 
SqlCallSyntax}.
+         */
+        public Builder callSyntax(String name, SqlCallSyntax syntax) {
+            this.sqlName = name;
+            this.sqlCallSyntax = syntax;
+            return this;
+        }
+
+        /**
+         * Overwrites the name that is used for unparsing a function into a 
SQL string. If not
+         * specified, {@link #name(String)} is used.
+         */
+        public Builder sqlName(String name) {
+            this.sqlName = name;
+            return this;
+        }
+
         public BuiltInFunctionDefinition build() {
             return new BuiltInFunctionDefinition(
                     name,
+                    sqlName,
                     version,
                     kind,
                     typeInferenceBuilder.build(),
+                    sqlCallSyntax,
                     isDeterministic,
                     isRuntimeProvided,
                     runtimeClass,
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 8f12cdd9b05..08a1b13dfeb 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -28,7 +28,9 @@ import org.apache.flink.table.api.JsonQueryWrapper;
 import org.apache.flink.table.api.JsonType;
 import org.apache.flink.table.api.JsonValueOnEmptyOrError;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.expressions.TimePointUnit;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategies;
@@ -44,6 +46,7 @@ import 
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
 import org.apache.flink.table.types.logical.TimestampKind;
 import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
 import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.util.Preconditions;
 
 import java.lang.reflect.Field;
@@ -52,6 +55,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.apache.flink.table.api.DataTypes.BIGINT;
@@ -374,6 +378,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition AND =
             BuiltInFunctionDefinition.newBuilder()
                     .name("and")
+                    .callSyntax("AND", SqlCallSyntax.MULTIPLE_BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             varyingSequence(
@@ -386,6 +391,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition OR =
             BuiltInFunctionDefinition.newBuilder()
                     .name("or")
+                    .callSyntax("OR", SqlCallSyntax.MULTIPLE_BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             varyingSequence(
@@ -398,6 +404,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition NOT =
             BuiltInFunctionDefinition.newBuilder()
                     .name("not")
+                    .callSyntax("NOT", SqlCallSyntax.UNARY_PREFIX_OP)
                     .kind(SCALAR)
                     
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
@@ -406,6 +413,13 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition IF =
             BuiltInFunctionDefinition.newBuilder()
                     .name("ifThenElse")
+                    .callSyntax(
+                            (sqlName, operands) ->
+                                    String.format(
+                                            "CASE WHEN %s THEN %s ELSE %s END",
+                                            
operands.get(0).asSerializableString(),
+                                            
operands.get(1).asSerializableString(),
+                                            
operands.get(2).asSerializableString()))
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             compositeSequence()
@@ -422,6 +436,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition EQUALS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("equals")
+                    .callSyntax("=", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(TWO_EQUALS_COMPARABLE)
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
@@ -430,6 +445,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition GREATER_THAN =
             BuiltInFunctionDefinition.newBuilder()
                     .name("greaterThan")
+                    .callSyntax(">", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(TWO_FULLY_COMPARABLE)
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
@@ -438,6 +454,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition GREATER_THAN_OR_EQUAL =
             BuiltInFunctionDefinition.newBuilder()
                     .name("greaterThanOrEqual")
+                    .callSyntax(">=", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(TWO_FULLY_COMPARABLE)
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
@@ -446,6 +463,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition LESS_THAN =
             BuiltInFunctionDefinition.newBuilder()
                     .name("lessThan")
+                    .callSyntax("<", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(TWO_FULLY_COMPARABLE)
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
@@ -454,6 +472,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition LESS_THAN_OR_EQUAL =
             BuiltInFunctionDefinition.newBuilder()
                     .name("lessThanOrEqual")
+                    .callSyntax("<=", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(TWO_FULLY_COMPARABLE)
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
@@ -462,6 +481,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition NOT_EQUALS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("notEquals")
+                    .callSyntax("<>", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(TWO_EQUALS_COMPARABLE)
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
@@ -470,6 +490,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition IS_NULL =
             BuiltInFunctionDefinition.newBuilder()
                     .name("isNull")
+                    .callSyntax("IS NULL", SqlCallSyntax.UNARY_SUFFIX_OP)
                     .kind(SCALAR)
                     
.inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
                     
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
@@ -478,6 +499,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition IS_NOT_NULL =
             BuiltInFunctionDefinition.newBuilder()
                     .name("isNotNull")
+                    .callSyntax("IS NOT NULL", SqlCallSyntax.UNARY_SUFFIX_OP)
                     .kind(SCALAR)
                     
.inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
                     
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
@@ -486,6 +508,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition IS_TRUE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("isTrue")
+                    .callSyntax("IS TRUE", SqlCallSyntax.UNARY_SUFFIX_OP)
                     .kind(SCALAR)
                     
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
                     
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
@@ -494,6 +517,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition IS_FALSE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("isFalse")
+                    .callSyntax("IS FALSE", SqlCallSyntax.UNARY_SUFFIX_OP)
                     .kind(SCALAR)
                     
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
                     
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
@@ -502,6 +526,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition IS_NOT_TRUE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("isNotTrue")
+                    .callSyntax("IS NOT TRUE", SqlCallSyntax.UNARY_SUFFIX_OP)
                     .kind(SCALAR)
                     
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
                     
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
@@ -510,6 +535,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition IS_NOT_FALSE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("isNotFalse")
+                    .callSyntax("IS NOT FALSE", SqlCallSyntax.UNARY_SUFFIX_OP)
                     .kind(SCALAR)
                     
.inputTypeStrategy(sequence(logical(LogicalTypeRoot.BOOLEAN)))
                     
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
@@ -519,6 +545,13 @@ public final class BuiltInFunctionDefinitions {
             BuiltInFunctionDefinition.newBuilder()
                     .name("between")
                     .kind(SCALAR)
+                    .callSyntax(
+                            (sqlName, operands) ->
+                                    String.format(
+                                            "%s BETWEEN %s AND %s",
+                                            
CallSyntaxUtils.asSerializableOperand(operands.get(0)),
+                                            
CallSyntaxUtils.asSerializableOperand(operands.get(1)),
+                                            
CallSyntaxUtils.asSerializableOperand(operands.get(2))))
                     .inputTypeStrategy(
                             comparable(ConstantArgumentCount.of(3), 
StructuredComparison.FULL))
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
@@ -527,6 +560,13 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition NOT_BETWEEN =
             BuiltInFunctionDefinition.newBuilder()
                     .name("notBetween")
+                    .callSyntax(
+                            (sqlName, operands) ->
+                                    String.format(
+                                            "%s NOT BETWEEN %s AND %s",
+                                            
CallSyntaxUtils.asSerializableOperand(operands.get(0)),
+                                            
CallSyntaxUtils.asSerializableOperand(operands.get(1)),
+                                            
CallSyntaxUtils.asSerializableOperand(operands.get(2))))
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             comparable(ConstantArgumentCount.of(3), 
StructuredComparison.FULL))
@@ -679,6 +719,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition DISTINCT =
             BuiltInFunctionDefinition.newBuilder()
                     .name("distinct")
+                    .callSyntax(SqlCallSyntax.DISTINCT)
                     .kind(AGGREGATE)
                     .inputTypeStrategy(sequence(ANY))
                     .outputTypeStrategy(argument(0))
@@ -707,6 +748,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition LIKE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("like")
+                    .callSyntax("LIKE", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             sequence(
@@ -736,6 +778,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition SIMILAR =
             BuiltInFunctionDefinition.newBuilder()
                     .name("similar")
+                    .callSyntax("SIMILAR TO", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             sequence(
@@ -747,6 +790,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition SUBSTRING =
             BuiltInFunctionDefinition.newBuilder()
                     .name("substring")
+                    .callSyntax("SUBSTRING", SqlCallSyntax.SUBSTRING)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -763,6 +807,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition SUBSTR =
             BuiltInFunctionDefinition.newBuilder()
                     .name("substr")
+                    .callSyntax("SUBSTR", SqlCallSyntax.SUBSTRING)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -791,6 +836,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition TRIM =
             BuiltInFunctionDefinition.newBuilder()
                     .name("trim")
+                    .callSyntax(SqlCallSyntax.TRIM)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             sequence(
@@ -822,6 +868,12 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition POSITION =
             BuiltInFunctionDefinition.newBuilder()
                     .name("position")
+                    .callSyntax(
+                            (sqlName, operands) ->
+                                    String.format(
+                                            "POSITION(%s IN %s)",
+                                            
operands.get(0).asSerializableString(),
+                                            
operands.get(1).asSerializableString()))
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             sequence(
@@ -833,6 +885,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition OVERLAY =
             BuiltInFunctionDefinition.newBuilder()
                     .name("overlay")
+                    .callSyntax(SqlCallSyntax.OVERLAY)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -901,6 +954,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition REGEXP_EXTRACT =
             BuiltInFunctionDefinition.newBuilder()
                     .name("regexpExtract")
+                    .sqlName("REGEXP_EXTRACT")
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -1150,6 +1204,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition PLUS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("plus")
+                    .callSyntax("+", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -1220,6 +1275,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition MINUS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("minus")
+                    .callSyntax("-", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -1248,6 +1304,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition AGG_DECIMAL_MINUS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("AGG_DECIMAL_MINUS")
+                    .callSyntax("-", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             sequence(
@@ -1260,6 +1317,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition DIVIDE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("divide")
+                    .callSyntax("/", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -1280,6 +1338,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition TIMES =
             BuiltInFunctionDefinition.newBuilder()
                     .name("times")
+                    .callSyntax("*", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -1322,6 +1381,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition FLOOR =
             BuiltInFunctionDefinition.newBuilder()
                     .name("floor")
+                    .callSyntax("FLOOR", SqlCallSyntax.FLOOR_OR_CEIL)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -1338,6 +1398,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition CEIL =
             BuiltInFunctionDefinition.newBuilder()
                     .name("ceil")
+                    .callSyntax("CEIL", SqlCallSyntax.FLOOR_OR_CEIL)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -1402,6 +1463,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition MOD =
             BuiltInFunctionDefinition.newBuilder()
                     .name("mod")
+                    .callSyntax("%", SqlCallSyntax.BINARY_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             sequence(
@@ -1422,6 +1484,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition MINUS_PREFIX =
             BuiltInFunctionDefinition.newBuilder()
                     .name("minusPrefix")
+                    .callSyntax("-", SqlCallSyntax.UNARY_PREFIX_OP)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -1649,6 +1712,17 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition EXTRACT =
             BuiltInFunctionDefinition.newBuilder()
                     .name("extract")
+                    .callSyntax(
+                            "EXTRACT",
+                            (sqlName, operands) ->
+                                    String.format(
+                                            "%s(%s %s %s)",
+                                            sqlName,
+                                            ((ValueLiteralExpression) 
operands.get(0))
+                                                    
.getValueAs(TimeIntervalUnit.class)
+                                                    .get(),
+                                            "FROM",
+                                            
operands.get(1).asSerializableString()))
                     .kind(SCALAR)
                     .inputTypeStrategy(SpecificInputTypeStrategies.EXTRACT)
                     .outputTypeStrategy(nullableIfArgs(explicit(BIGINT())))
@@ -1707,6 +1781,14 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition TEMPORAL_OVERLAPS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("temporalOverlaps")
+                    .callSyntax(
+                            (sqlName, operands) ->
+                                    String.format(
+                                            "(%s, %s) OVERLAPS (%s, %s)",
+                                            
operands.get(0).asSerializableString(),
+                                            
operands.get(1).asSerializableString(),
+                                            
operands.get(2).asSerializableString(),
+                                            
operands.get(3).asSerializableString()))
                     .kind(SCALAR)
                     
.inputTypeStrategy(SpecificInputTypeStrategies.TEMPORAL_OVERLAPS)
                     .outputTypeStrategy(nullableIfArgs(explicit(BOOLEAN())))
@@ -1830,6 +1912,12 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition AT =
             BuiltInFunctionDefinition.newBuilder()
                     .name("at")
+                    .callSyntax(
+                            (sqlName, operands) ->
+                                    String.format(
+                                            "%s[%s]",
+                                            
operands.get(0).asSerializableString(),
+                                            
operands.get(1).asSerializableString()))
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             sequence(
@@ -1855,6 +1943,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition ARRAY =
             BuiltInFunctionDefinition.newBuilder()
                     .name("array")
+                    .callSyntax("ARRAY", SqlCallSyntax.COLLECTION_CTOR)
                     .kind(SCALAR)
                     .inputTypeStrategy(SpecificInputTypeStrategies.ARRAY)
                     .outputTypeStrategy(SpecificTypeStrategies.ARRAY)
@@ -1871,6 +1960,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition MAP =
             BuiltInFunctionDefinition.newBuilder()
                     .name("map")
+                    .callSyntax("MAP", SqlCallSyntax.COLLECTION_CTOR)
                     .kind(SCALAR)
                     .inputTypeStrategy(SpecificInputTypeStrategies.MAP)
                     .outputTypeStrategy(SpecificTypeStrategies.MAP)
@@ -1904,6 +1994,25 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition GET =
             BuiltInFunctionDefinition.newBuilder()
                     .name("get")
+                    .callSyntax(
+                            (sqlName, operands) -> {
+                                final Optional<String> fieldName =
+                                        ((ValueLiteralExpression) 
operands.get(1))
+                                                .getValueAs(String.class);
+
+                                return fieldName
+                                        .map(
+                                                n ->
+                                                        String.format(
+                                                                "%s.%s",
+                                                                operands.get(0)
+                                                                        
.asSerializableString(),
+                                                                
EncodingUtils.escapeIdentifier(n)))
+                                        .orElseGet(
+                                                () ->
+                                                        
SqlCallSyntax.FUNCTION.unparse(
+                                                                sqlName, 
operands));
+                            })
                     .kind(OTHER)
                     .inputTypeStrategy(
                             sequence(
@@ -2092,6 +2201,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition IS_JSON =
             BuiltInFunctionDefinition.newBuilder()
                     .name("IS_JSON")
+                    .callSyntax(JsonFunctionsCallSyntax.IS_JSON)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -2099,13 +2209,14 @@ public final class BuiltInFunctionDefinitions {
                                     sequence(
                                             
logical(LogicalTypeFamily.CHARACTER_STRING),
                                             symbol(JsonType.class))))
-                    
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
+                    .outputTypeStrategy(explicit(BOOLEAN().notNull()))
                     .runtimeDeferred()
                     .build();
 
     public static final BuiltInFunctionDefinition JSON_EXISTS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_EXISTS")
+                    .callSyntax("JSON_EXISTS", 
JsonFunctionsCallSyntax.JSON_EXISTS)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             or(
@@ -2120,13 +2231,14 @@ public final class BuiltInFunctionDefinitions {
                                                     
logical(LogicalTypeFamily.CHARACTER_STRING),
                                                     LITERAL),
                                             symbol(JsonExistsOnError.class))))
-                    
.outputTypeStrategy(explicit(DataTypes.BOOLEAN().nullable()))
+                    .outputTypeStrategy(explicit(BOOLEAN().nullable()))
                     .runtimeDeferred()
                     .build();
 
     public static final BuiltInFunctionDefinition JSON_VALUE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_VALUE")
+                    .callSyntax("JSON_VALUE", 
JsonFunctionsCallSyntax.JSON_VALUE)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             sequence(
@@ -2145,6 +2257,7 @@ public final class BuiltInFunctionDefinitions {
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_QUERY")
                     .kind(SCALAR)
+                    .callSyntax(JsonFunctionsCallSyntax.JSON_QUERY)
                     .inputTypeStrategy(
                             sequence(
                                     
logical(LogicalTypeFamily.CHARACTER_STRING),
@@ -2168,6 +2281,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition JSON_OBJECT =
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_OBJECT")
+                    .callSyntax(JsonFunctionsCallSyntax.JSON_OBJECT)
                     .kind(SCALAR)
                     .inputTypeStrategy(SpecificInputTypeStrategies.JSON_OBJECT)
                     .outputTypeStrategy(explicit(DataTypes.STRING().notNull()))
@@ -2177,6 +2291,9 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition JSON_OBJECTAGG_NULL_ON_NULL =
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_OBJECTAGG_NULL_ON_NULL")
+                    .callSyntax(
+                            "JSON_OBJECTAGG",
+                            
JsonFunctionsCallSyntax.jsonObjectAgg(JsonOnNull.NULL))
                     .kind(AGGREGATE)
                     .inputTypeStrategy(
                             
sequence(logical(LogicalTypeFamily.CHARACTER_STRING), JSON_ARGUMENT))
@@ -2187,6 +2304,9 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition 
JSON_OBJECTAGG_ABSENT_ON_NULL =
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_OBJECTAGG_ABSENT_ON_NULL")
+                    .callSyntax(
+                            "JSON_OBJECTAGG",
+                            
JsonFunctionsCallSyntax.jsonObjectAgg(JsonOnNull.ABSENT))
                     .kind(AGGREGATE)
                     .inputTypeStrategy(
                             
sequence(logical(LogicalTypeFamily.CHARACTER_STRING), JSON_ARGUMENT))
@@ -2197,6 +2317,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition JSON_ARRAY =
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_ARRAY")
+                    .callSyntax(JsonFunctionsCallSyntax.JSON_ARRAY)
                     .kind(SCALAR)
                     .inputTypeStrategy(
                             InputTypeStrategies.varyingSequence(
@@ -2209,6 +2330,8 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition JSON_ARRAYAGG_NULL_ON_NULL =
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_ARRAYAGG_NULL_ON_NULL")
+                    .callSyntax(
+                            "JSON_ARRAYAGG", 
JsonFunctionsCallSyntax.jsonArrayAgg(JsonOnNull.NULL))
                     .kind(AGGREGATE)
                     .inputTypeStrategy(sequence(JSON_ARGUMENT))
                     .outputTypeStrategy(explicit(DataTypes.STRING().notNull()))
@@ -2218,6 +2341,9 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition JSON_ARRAYAGG_ABSENT_ON_NULL 
=
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_ARRAYAGG_ABSENT_ON_NULL")
+                    .callSyntax(
+                            "JSON_ARRAYAGG",
+                            
JsonFunctionsCallSyntax.jsonArrayAgg(JsonOnNull.ABSENT))
                     .kind(AGGREGATE)
                     .inputTypeStrategy(sequence(JSON_ARGUMENT))
                     .outputTypeStrategy(explicit(DataTypes.STRING().notNull()))
@@ -2232,6 +2358,7 @@ public final class BuiltInFunctionDefinitions {
             BuiltInFunctionDefinition.newBuilder()
                     .name("in")
                     .kind(SCALAR)
+                    .callSyntax("IN", SqlCallSyntax.IN)
                     .inputTypeStrategy(SpecificInputTypeStrategies.IN)
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
                     .build();
@@ -2239,6 +2366,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition CAST =
             BuiltInFunctionDefinition.newBuilder()
                     .name("cast")
+                    .callSyntax("CAST", SqlCallSyntax.CAST)
                     .kind(SCALAR)
                     .inputTypeStrategy(SpecificInputTypeStrategies.CAST)
                     .outputTypeStrategy(
@@ -2248,6 +2376,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition TRY_CAST =
             BuiltInFunctionDefinition.newBuilder()
                     .name("TRY_CAST")
+                    .callSyntax("TRY_CAST", SqlCallSyntax.CAST)
                     .kind(SCALAR)
                     .inputTypeStrategy(SpecificInputTypeStrategies.CAST)
                     
.outputTypeStrategy(forceNullable(TypeStrategies.argument(1)))
@@ -2256,6 +2385,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition REINTERPRET_CAST =
             BuiltInFunctionDefinition.newBuilder()
                     .name("reinterpretCast")
+                    .callSyntax("REINTERPRET_CAST", SqlCallSyntax.CAST)
                     .kind(SCALAR)
                     
.inputTypeStrategy(SpecificInputTypeStrategies.REINTERPRET_CAST)
                     .outputTypeStrategy(TypeStrategies.argument(1))
@@ -2264,6 +2394,7 @@ public final class BuiltInFunctionDefinitions {
     public static final BuiltInFunctionDefinition AS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("as")
+                    .callSyntax("AS", SqlCallSyntax.AS)
                     .kind(OTHER)
                     .inputTypeStrategy(
                             varyingSequence(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/CallSyntaxUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/CallSyntaxUtils.java
new file mode 100644
index 00000000000..d285db68be0
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/CallSyntaxUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TableSymbol;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+
+/** Utility functions that can be used for writing {@link SqlCallSyntax}. */
+@Internal
+class CallSyntaxUtils {
+
+    /**
+     * Converts the given {@link ResolvedExpression} into a SQL string. Wraps 
the string with
+     * parenthesis if the expression is not a leaf expression such as e.g. 
{@link
+     * ValueLiteralExpression} or {@link FieldReferenceExpression}.
+     */
+    static String asSerializableOperand(ResolvedExpression expression) {
+        if (expression.getResolvedChildren().isEmpty()) {
+            return expression.asSerializableString();
+        }
+
+        return String.format("(%s)", expression.asSerializableString());
+    }
+
+    static <T extends TableSymbol> T getSymbolLiteral(ResolvedExpression 
operands, Class<T> clazz) {
+        return ((ValueLiteralExpression) operands).getValueAs(clazz).get();
+    }
+
+    private CallSyntaxUtils() {}
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/JsonFunctionsCallSyntax.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/JsonFunctionsCallSyntax.java
new file mode 100644
index 00000000000..f60b9cddcdd
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/JsonFunctionsCallSyntax.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.JsonExistsOnError;
+import org.apache.flink.table.api.JsonOnNull;
+import org.apache.flink.table.api.JsonQueryOnEmptyOrError;
+import org.apache.flink.table.api.JsonQueryWrapper;
+import org.apache.flink.table.api.JsonType;
+import org.apache.flink.table.api.JsonValueOnEmptyOrError;
+import org.apache.flink.table.expressions.ResolvedExpression;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.functions.CallSyntaxUtils.getSymbolLiteral;
+
+/** Implementations of {@link SqlCallSyntax} specific for JSON functions. */
+@Internal
+class JsonFunctionsCallSyntax {
+
+    static final SqlCallSyntax IS_JSON =
+            (sqlName, operands) -> {
+                final String s =
+                        String.format(
+                                "%s IS JSON",
+                                
CallSyntaxUtils.asSerializableOperand(operands.get(0)));
+                if (operands.size() > 1) {
+                    return s + " " + getSymbolLiteral(operands.get(1), 
JsonType.class);
+                }
+
+                return s;
+            };
+
+    static final SqlCallSyntax JSON_VALUE =
+            (sqlName, operands) -> {
+                StringBuilder s =
+                        new StringBuilder(
+                                String.format(
+                                        "JSON_VALUE(%s, %s RETURNING %s ",
+                                        operands.get(0).asSerializableString(),
+                                        operands.get(1).asSerializableString(),
+                                        
operands.get(2).asSerializableString()));
+
+                final JsonValueOnEmptyOrError onEmpty =
+                        getSymbolLiteral(operands.get(3), 
JsonValueOnEmptyOrError.class);
+
+                if (onEmpty == JsonValueOnEmptyOrError.DEFAULT) {
+                    s.append(String.format("DEFAULT %s", 
operands.get(4).asSerializableString()));
+                } else {
+                    s.append(onEmpty);
+                }
+                s.append(" ON EMPTY ");
+
+                final JsonValueOnEmptyOrError onError =
+                        getSymbolLiteral(operands.get(5), 
JsonValueOnEmptyOrError.class);
+
+                if (onError == JsonValueOnEmptyOrError.DEFAULT) {
+                    s.append(String.format("DEFAULT %s", 
operands.get(6).asSerializableString()));
+                } else {
+                    s.append(onError);
+                }
+                s.append(" ON ERROR)");
+
+                return s.toString();
+            };
+
+    static final SqlCallSyntax JSON_EXISTS =
+            (sqlName, operands) -> {
+                if (operands.size() == 3) {
+                    return String.format(
+                            "%s(%s, %s %s ON ERROR)",
+                            sqlName,
+                            operands.get(0).asSerializableString(),
+                            operands.get(1).asSerializableString(),
+                            getSymbolLiteral(operands.get(2), 
JsonExistsOnError.class));
+                } else {
+                    return SqlCallSyntax.FUNCTION.unparse(sqlName, operands);
+                }
+            };
+
+    static final SqlCallSyntax JSON_QUERY =
+            (sqlName, operands) -> {
+                final JsonQueryWrapper wrapper =
+                        getSymbolLiteral(operands.get(2), 
JsonQueryWrapper.class);
+                final JsonQueryOnEmptyOrError onEmpty =
+                        getSymbolLiteral(operands.get(3), 
JsonQueryOnEmptyOrError.class);
+                final JsonQueryOnEmptyOrError onError =
+                        getSymbolLiteral(operands.get(4), 
JsonQueryOnEmptyOrError.class);
+
+                return String.format(
+                        "JSON_QUERY(%s, %s %s WRAPPER %s ON EMPTY %s ON 
ERROR)",
+                        operands.get(0).asSerializableString(),
+                        operands.get(1).asSerializableString(),
+                        toString(wrapper),
+                        onEmpty.toString().replaceAll("_", " "),
+                        onError.toString().replaceAll("_", " "));
+            };
+
+    static final SqlCallSyntax JSON_OBJECT =
+            (sqlName, operands) -> {
+                final String entries =
+                        IntStream.range(0, operands.size() / 2)
+                                .mapToObj(
+                                        i ->
+                                                String.format(
+                                                        "KEY %s VALUE %s",
+                                                        operands.get(2 * i + 1)
+                                                                
.asSerializableString(),
+                                                        operands.get(2 * i + 2)
+                                                                
.asSerializableString()))
+                                .collect(Collectors.joining(", "));
+
+                final JsonOnNull onNull = getSymbolLiteral(operands.get(0), 
JsonOnNull.class);
+                return String.format("JSON_OBJECT(%s %s ON NULL)", entries, 
onNull);
+            };
+
+    static final SqlCallSyntax JSON_ARRAY =
+            (sqlName, operands) -> {
+                if (operands.size() == 1) {
+                    return "JSON_ARRAY()";
+                }
+                final String entries =
+                        operands.subList(1, operands.size()).stream()
+                                .map(ResolvedExpression::asSerializableString)
+                                .collect(Collectors.joining(", "));
+
+                final JsonOnNull onNull = getSymbolLiteral(operands.get(0), 
JsonOnNull.class);
+                return String.format("JSON_ARRAY(%s %s ON NULL)", entries, 
onNull);
+            };
+
+    static SqlCallSyntax jsonArrayAgg(JsonOnNull onNull) {
+        return (sqlName, operands) ->
+                String.format(
+                        "%s(%s %s ON NULL)",
+                        sqlName, operands.get(0).asSerializableString(), 
onNull);
+    }
+
+    static SqlCallSyntax jsonObjectAgg(JsonOnNull onNull) {
+        return (sqlName, operands) ->
+                String.format(
+                        "%s(KEY %s VALUE %s %s ON NULL)",
+                        sqlName,
+                        operands.get(0).asSerializableString(),
+                        operands.get(1).asSerializableString(),
+                        onNull);
+    }
+
+    private static String toString(JsonQueryWrapper wrapper) {
+        final String wrapperStr;
+        switch (wrapper) {
+            case WITHOUT_ARRAY:
+                wrapperStr = "WITHOUT ARRAY";
+                break;
+            case CONDITIONAL_ARRAY:
+                wrapperStr = "WITH CONDITIONAL ARRAY";
+                break;
+            case UNCONDITIONAL_ARRAY:
+                wrapperStr = "WITH UNCONDITIONAL ARRAY";
+                break;
+            default:
+                throw new IllegalStateException("Unexpected value: " + 
wrapper);
+        }
+        return wrapperStr;
+    }
+
+    private JsonFunctionsCallSyntax() {}
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/SqlCallSyntax.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/SqlCallSyntax.java
new file mode 100644
index 00000000000..785d0657658
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/SqlCallSyntax.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Provides a format for unparsing {@link BuiltInFunctionDefinitions} into a 
SQL string. */
+@Internal
+public interface SqlCallSyntax {
+
+    String unparse(String sqlName, List<ResolvedExpression> operands);
+
+    /**
+     * Special case for aggregate functions, which can have a DISTINCT 
function applied. Called only
+     * from the DISTINCT function.
+     */
+    default String unparseDistinct(String sqlName, List<ResolvedExpression> 
operands) {
+        throw new UnsupportedOperationException(
+                "Only the FUNCTION syntax supports the DISTINCT clause.");
+    }
+
+    /** Function syntax, as in "Foo(x, y)". */
+    SqlCallSyntax FUNCTION =
+            new SqlCallSyntax() {
+                @Override
+                public String unparse(String sqlName, List<ResolvedExpression> 
operands) {
+                    return doUnParse(sqlName, operands, false);
+                }
+
+                @Override
+                public String unparseDistinct(String sqlName, 
List<ResolvedExpression> operands) {
+                    return doUnParse(sqlName, operands, true);
+                }
+
+                private String doUnParse(
+                        String sqlName, List<ResolvedExpression> operands, 
boolean isDistinct) {
+                    return String.format(
+                            "%s(%s%s)",
+                            sqlName,
+                            isDistinct ? "DISTINCT " : "",
+                            operands.stream()
+                                    
.map(ResolvedExpression::asSerializableString)
+                                    .collect(Collectors.joining(", ")));
+                }
+            };
+
+    /**
+     * Function syntax for handling DISTINCT aggregates. Special case. It does 
not have a syntax
+     * itself, but modifies the syntax of the nested call.
+     */
+    SqlCallSyntax DISTINCT =
+            (sqlName, operands) -> {
+                final CallExpression callExpression = (CallExpression) 
operands.get(0);
+                if (callExpression.getFunctionDefinition() instanceof 
BuiltInFunctionDefinition) {
+                    final BuiltInFunctionDefinition builtinDefinition =
+                            (BuiltInFunctionDefinition) 
callExpression.getFunctionDefinition();
+                    return builtinDefinition
+                            .getCallSyntax()
+                            .unparseDistinct(
+                                    builtinDefinition.getSqlName(),
+                                    callExpression.getResolvedChildren());
+                } else {
+                    return SqlCallSyntax.FUNCTION.unparseDistinct(
+                            callExpression.getFunctionName(), 
callExpression.getResolvedChildren());
+                }
+            };
+
+    /** Function syntax for collection ctors, such as ARRAY[1, 2, 3] or 
MAP['a', 1, 'b', 2]. */
+    SqlCallSyntax COLLECTION_CTOR =
+            (sqlName, operands) ->
+                    String.format(
+                            "%s[%s]",
+                            sqlName,
+                            operands.stream()
+                                    
.map(ResolvedExpression::asSerializableString)
+                                    .collect(Collectors.joining(", ")));
+
+    /** Binary operator syntax, as in "x + y". */
+    SqlCallSyntax BINARY_OP =
+            (sqlName, operands) ->
+                    String.format(
+                            "%s %s %s",
+                            
CallSyntaxUtils.asSerializableOperand(operands.get(0)),
+                            sqlName,
+                            
CallSyntaxUtils.asSerializableOperand(operands.get(1)));
+
+    /**
+     * Binary operator syntax that in Table API can accept multiple operands, 
as in "x AND y AND t
+     * AND w".
+     */
+    SqlCallSyntax MULTIPLE_BINARY_OP =
+            (sqlName, operands) ->
+                    operands.stream()
+                            .map(CallSyntaxUtils::asSerializableOperand)
+                            .collect(Collectors.joining(String.format(" %s ", 
sqlName)));
+
+    /** Postfix unary operator syntax, as in "x ++". */
+    SqlCallSyntax UNARY_SUFFIX_OP =
+            (sqlName, operands) ->
+                    String.format(
+                            "%s %s",
+                            
CallSyntaxUtils.asSerializableOperand(operands.get(0)), sqlName);
+
+    /** Prefix unary operator syntax, as in "- x". */
+    SqlCallSyntax UNARY_PREFIX_OP =
+            (sqlName, operands) ->
+                    String.format(
+                            "%s %s",
+                            sqlName, 
CallSyntaxUtils.asSerializableOperand(operands.get(0)));
+
+    /**
+     * Special sql syntax for CAST operators (CAST, TRY_CAST, 
REINTERPRET_CAST).
+     *
+     * <p>Example: CAST(123 AS STRING)
+     */
+    SqlCallSyntax CAST =
+            (sqlName, operands) ->
+                    String.format(
+                            "%s(%s AS %s)",
+                            sqlName,
+                            operands.get(0).asSerializableString(),
+                            operands.get(1).asSerializableString());
+
+    /**
+     * Special sql syntax for SUBSTRING operators (SUBSTRING, SUBSTR).
+     *
+     * <p>Example: SUBSTR('abc' FROM 'abcdef' FOR 3)
+     */
+    SqlCallSyntax SUBSTRING =
+            (sqlName, operands) -> {
+                final String s =
+                        String.format(
+                                "%s(%s FROM %s",
+                                sqlName,
+                                operands.get(0).asSerializableString(),
+                                operands.get(1).asSerializableString());
+                if (operands.size() == 3) {
+                    return s + String.format(" FOR %s)", 
operands.get(2).asSerializableString());
+                }
+
+                return s + ")";
+            };
+
+    /**
+     * Special sql syntax for FLOOR and CEIL.
+     *
+     * <p>Examples:
+     *
+     * <ul>
+     *   <li>FLOOR(TIME ‘12:44:31’ TO MINUTE)
+     *   <li>FLOOR(123)
+     * </ul>
+     */
+    SqlCallSyntax FLOOR_OR_CEIL =
+            (sqlName, operands) -> {
+                if (operands.size() == 1) {
+                    // case for numeric floor & ceil
+                    return SqlCallSyntax.FUNCTION.unparse(sqlName, operands);
+                } else {
+                    // case for flooring/ceiling to temporal units
+                    return String.format(
+                            "%s(%s TO %s)",
+                            sqlName,
+                            operands.get(0).asSerializableString(),
+                            ((ValueLiteralExpression) operands.get(1))
+                                    .getValueAs(TimeIntervalUnit.class)
+                                    .get());
+                }
+            };
+
+    /**
+     * Special sql syntax for TRIM.
+     *
+     * <p>Example: TRIM BOTH ' ' FROM 0;
+     */
+    SqlCallSyntax TRIM =
+            (sqlName, operands) -> {
+                final boolean trimLeading =
+                        ((ValueLiteralExpression) 
operands.get(0)).getValueAs(Boolean.class).get();
+                final boolean trimTrailing =
+                        ((ValueLiteralExpression) 
operands.get(1)).getValueAs(Boolean.class).get();
+                final String format;
+
+                // leading & trailing is translated to BOTH
+                if (trimLeading && trimTrailing) {
+                    format = "TRIM BOTH %s FROM %s";
+                } else if (trimLeading) {
+                    format = "TRIM LEADING %s FROM %s";
+                } else if (trimTrailing) {
+                    format = "TRIM TRAILING %s FROM %s";
+                } else {
+                    format = "TRIM %s FROM %s";
+                }
+
+                return String.format(
+                        format,
+                        operands.get(2).asSerializableString(),
+                        operands.get(3).asSerializableString());
+            };
+
+    /**
+     * Special sql syntax for OVERLAY.
+     *
+     * <p>Example: OVERLAY('abcd' PLACING 'def' FROM 3 FOR 2)
+     */
+    SqlCallSyntax OVERLAY =
+            (sqlName, operands) -> {
+                final String s =
+                        String.format(
+                                "OVERLAY(%s PLACING %s FROM %s",
+                                operands.get(0).asSerializableString(),
+                                operands.get(1).asSerializableString(),
+                                operands.get(2).asSerializableString());
+
+                // optional length
+                if (operands.size() == 4) {
+                    return s + String.format(" FOR %s)", 
operands.get(3).asSerializableString());
+                }
+
+                return s + ")";
+            };
+
+    /** Special sql syntax for AS. The string literal is formatted as an 
identifier. */
+    SqlCallSyntax AS =
+            (sqlName, operands) -> {
+                if (operands.size() != 2) {
+                    throw new TableException(
+                            "The AS function with multiple aliases is not SQL"
+                                    + " serializable. It should've been 
flattened during expression"
+                                    + " resolution.");
+                }
+                final String identifier =
+                        ((ValueLiteralExpression) 
operands.get(1)).getValueAs(String.class).get();
+                return String.format(
+                        "%s %s %s",
+                        CallSyntaxUtils.asSerializableOperand(operands.get(0)),
+                        sqlName,
+                        EncodingUtils.escapeIdentifier(identifier));
+            };
+
+    /** Call syntax for {@link BuiltInFunctionDefinitions#IN}. */
+    SqlCallSyntax IN =
+            (sqlName, operands) ->
+                    String.format(
+                            "%s IN (%s)",
+                            operands.get(0).asSerializableString(),
+                            operands.subList(1, operands.size()).stream()
+                                    
.map(ResolvedExpression::asSerializableString)
+                                    .collect(Collectors.joining(", ")));
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
index e4e5d595e74..dd05dad760a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
@@ -31,7 +31,11 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.operations.AggregateQueryOperation;
+import org.apache.flink.table.operations.ProjectQueryOperation;
 import org.apache.flink.table.planner.factories.TableFactoryHarness;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -40,6 +44,7 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.function.Executable;
@@ -51,10 +56,12 @@ import org.junit.jupiter.params.provider.MethodSource;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.runtime.state.StateBackendLoader.HASHMAP_STATE_BACKEND_NAME;
@@ -69,7 +76,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 abstract class BuiltInAggregateFunctionTestBase {
 
     @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new 
MiniClusterExtension();
+    public static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new 
MiniClusterExtension();
 
     abstract Stream<TestSpec> getTestCaseSpecs();
 
@@ -145,6 +152,27 @@ abstract class BuiltInAggregateFunctionTestBase {
 
     // 
---------------------------------------------------------------------------------------------
 
+    protected static final class TableApiAggSpec {
+        private final List<Expression> selectExpr;
+        private final List<Expression> groupByExpr;
+
+        public TableApiAggSpec(List<Expression> selectExpr, List<Expression> 
groupByExpr) {
+            this.selectExpr = selectExpr;
+            this.groupByExpr = groupByExpr;
+        }
+
+        public static TableApiAggSpec groupBySelect(
+                List<Expression> groupByExpr, Expression... selectExpr) {
+            return new TableApiAggSpec(
+                    Arrays.stream(selectExpr).collect(Collectors.toList()), 
groupByExpr);
+        }
+
+        public static TableApiAggSpec select(Expression... selectExpr) {
+            return new TableApiAggSpec(
+                    Arrays.stream(selectExpr).collect(Collectors.toList()), 
null);
+        }
+    }
+
     /** Test specification. */
     protected static class TestSpec {
 
@@ -182,16 +210,29 @@ abstract class BuiltInAggregateFunctionTestBase {
         }
 
         TestSpec testApiResult(
-                Function<Table, Table> tableApiSpec,
+                List<Expression> selectExpr,
+                List<Expression> groupByExpr,
                 DataType expectedRowType,
                 List<Row> expectedRows) {
-            this.testItems.add(new TableApiTestItem(tableApiSpec, 
expectedRowType, expectedRows));
+            this.testItems.add(
+                    new TableApiTestItem(selectExpr, groupByExpr, 
expectedRowType, expectedRows));
+            return this;
+        }
+
+        TestSpec testApiSqlResult(
+                List<Expression> selectExpr,
+                List<Expression> groupByExpr,
+                DataType expectedRowType,
+                List<Row> expectedRows) {
+            this.testItems.add(
+                    new TableApiSqlResultTestItem(
+                            selectExpr, groupByExpr, expectedRowType, 
expectedRows));
             return this;
         }
 
         TestSpec testResult(
                 Function<Table, String> sqlSpec,
-                Function<Table, Table> tableApiSpec,
+                TableApiAggSpec tableApiSpec,
                 DataType expectedRowType,
                 List<Row> expectedRows) {
             return testResult(
@@ -200,12 +241,21 @@ abstract class BuiltInAggregateFunctionTestBase {
 
         TestSpec testResult(
                 Function<Table, String> sqlSpec,
-                Function<Table, Table> tableApiSpec,
+                TableApiAggSpec tableApiSpec,
                 DataType expectedSqlRowType,
                 DataType expectedTableApiRowType,
                 List<Row> expectedRows) {
             testSqlResult(sqlSpec, expectedSqlRowType, expectedRows);
-            testApiResult(tableApiSpec, expectedTableApiRowType, expectedRows);
+            testApiResult(
+                    tableApiSpec.selectExpr,
+                    tableApiSpec.groupByExpr,
+                    expectedTableApiRowType,
+                    expectedRows);
+            testApiSqlResult(
+                    tableApiSpec.selectExpr,
+                    tableApiSpec.groupByExpr,
+                    expectedSqlRowType,
+                    expectedRows);
             return this;
         }
 
@@ -312,19 +362,134 @@ abstract class BuiltInAggregateFunctionTestBase {
     }
 
     private static class TableApiTestItem extends SuccessItem {
-        private final Function<Table, Table> spec;
+        private final List<Expression> selectExpr;
+        private final List<Expression> groupByExpr;
 
         public TableApiTestItem(
-                Function<Table, Table> spec,
+                List<Expression> selectExpr,
+                @Nullable List<Expression> groupByExpr,
                 @Nullable DataType expectedRowType,
                 @Nullable List<Row> expectedRows) {
             super(expectedRowType, expectedRows);
-            this.spec = spec;
+            this.selectExpr = selectExpr;
+            this.groupByExpr = groupByExpr;
+        }
+
+        @Override
+        protected TableResult getResult(TableEnvironment tEnv, Table 
sourceTable) {
+            if (groupByExpr != null) {
+                return sourceTable
+                        .groupBy(groupByExpr.toArray(new Expression[0]))
+                        .select(selectExpr.toArray(new Expression[0]))
+                        .execute();
+            } else {
+                return sourceTable.select(selectExpr.toArray(new 
Expression[0])).execute();
+            }
+        }
+    }
+
+    private static class TableApiSqlResultTestItem extends SuccessItem {
+
+        private final List<Expression> selectExpr;
+        private final List<Expression> groupByExpr;
+
+        public TableApiSqlResultTestItem(
+                List<Expression> selectExpr,
+                @Nullable List<Expression> groupByExpr,
+                @Nullable DataType expectedRowType,
+                @Nullable List<Row> expectedRows) {
+            super(expectedRowType, expectedRows);
+            this.selectExpr = selectExpr;
+            this.groupByExpr = groupByExpr;
         }
 
         @Override
         protected TableResult getResult(TableEnvironment tEnv, Table 
sourceTable) {
-            return spec.apply(sourceTable).execute();
+            final Table select;
+            if (groupByExpr != null) {
+                select =
+                        sourceTable
+                                .groupBy(groupByExpr.toArray(new 
Expression[0]))
+                                .select(selectExpr.toArray(new Expression[0]));
+
+            } else {
+                select = sourceTable.select(selectExpr.toArray(new 
Expression[0]));
+            }
+            final ProjectQueryOperation projectQueryOperation =
+                    (ProjectQueryOperation) select.getQueryOperation();
+            final AggregateQueryOperation aggQueryOperation =
+                    (AggregateQueryOperation) 
select.getQueryOperation().getChildren().get(0);
+
+            final List<ResolvedExpression> selectExpr =
+                    recreateSelectList(aggQueryOperation, 
projectQueryOperation);
+
+            final String selectAsSerializableString = 
toSerializableExpr(selectExpr);
+            final String groupByAsSerializableString =
+                    
toSerializableExpr(aggQueryOperation.getGroupingExpressions());
+
+            final StringBuilder stringBuilder = new StringBuilder();
+            stringBuilder
+                    .append("SELECT ")
+                    .append(selectAsSerializableString)
+                    .append(" FROM ")
+                    .append(sourceTable);
+            if (!groupByAsSerializableString.isEmpty()) {
+                stringBuilder.append(" GROUP BY 
").append(groupByAsSerializableString);
+            }
+
+            return tEnv.sqlQuery(stringBuilder.toString()).execute();
+        }
+
+        @NotNull
+        private static List<ResolvedExpression> recreateSelectList(
+                AggregateQueryOperation aggQueryOperation,
+                ProjectQueryOperation projectQueryOperation) {
+            final List<String> projectSchemaFields =
+                    projectQueryOperation.getResolvedSchema().getColumnNames();
+            final List<String> aggSchemaFields =
+                    aggQueryOperation.getResolvedSchema().getColumnNames();
+            return IntStream.range(0, projectSchemaFields.size())
+                    .mapToObj(
+                            idx -> {
+                                final int indexInAgg =
+                                        
aggSchemaFields.indexOf(projectSchemaFields.get(idx));
+                                if (indexInAgg >= 0) {
+                                    final int groupingExprCount =
+                                            
aggQueryOperation.getGroupingExpressions().size();
+                                    if (indexInAgg < groupingExprCount) {
+                                        return aggQueryOperation
+                                                .getGroupingExpressions()
+                                                .get(indexInAgg);
+                                    } else {
+                                        return aggQueryOperation
+                                                .getAggregateExpressions()
+                                                .get(indexInAgg - 
groupingExprCount);
+                                    }
+                                } else {
+                                    return 
projectQueryOperation.getProjectList().get(idx);
+                                }
+                            })
+                    .collect(Collectors.toList());
+        }
+
+        private static String toSerializableExpr(List<ResolvedExpression> 
expressions) {
+            return expressions.stream()
+                    .map(ResolvedExpression::asSerializableString)
+                    .collect(Collectors.joining(", "));
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "[API as SQL] select: [%s] groupBy: [%s]",
+                    selectExpr.stream()
+                            .map(Expression::asSummaryString)
+                            .collect(Collectors.joining(", ")),
+                    groupByExpr != null
+                            ? groupByExpr.stream()
+                                    .map(Expression::asSummaryString)
+                                    .collect(Collectors.joining(", "))
+                            : "");
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
index b7d0ae13294..566a0106083 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInFunctionTestBase.java
@@ -28,8 +28,10 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.operations.ProjectQueryOperation;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -76,7 +78,7 @@ import static org.assertj.core.api.Assertions.catchThrowable;
 abstract class BuiltInFunctionTestBase {
 
     @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new 
MiniClusterExtension();
+    public static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new 
MiniClusterExtension();
 
     Configuration getConfiguration() {
         return new Configuration();
@@ -283,6 +285,7 @@ abstract class BuiltInFunctionTestBase {
                 List<AbstractDataType<?>> tableApiDataType,
                 List<AbstractDataType<?>> sqlDataType) {
             testItems.add(new TableApiResultTestItem(expression, result, 
tableApiDataType));
+            testItems.add(new TableApiSqlResultTestItem(expression, result, 
tableApiDataType));
             testItems.add(
                     new SqlResultTestItem(String.join(",", sqlExpression), 
result, sqlDataType));
             return this;
@@ -457,6 +460,36 @@ abstract class BuiltInFunctionTestBase {
         }
     }
 
+    private static class TableApiSqlResultTestItem extends 
ResultTestItem<List<Expression>> {
+
+        TableApiSqlResultTestItem(
+                List<Expression> expressions,
+                List<Object> results,
+                List<AbstractDataType<?>> dataTypes) {
+            super(expressions, results, dataTypes);
+        }
+
+        @Override
+        Table query(TableEnvironment env, Table inputTable) {
+            final Table select = inputTable.select(expression.toArray(new 
Expression[] {}));
+            final ProjectQueryOperation projectQueryOperation =
+                    (ProjectQueryOperation) select.getQueryOperation();
+            final String exprAsSerializableString =
+                    projectQueryOperation.getProjectList().stream()
+                            .map(ResolvedExpression::asSerializableString)
+                            .collect(Collectors.joining(", "));
+            return env.sqlQuery("SELECT " + exprAsSerializableString + " FROM 
" + inputTable);
+        }
+
+        @Override
+        public String toString() {
+            return "[API as SQL] "
+                    + expression.stream()
+                            .map(Expression::asSummaryString)
+                            .collect(Collectors.joining(", "));
+        }
+    }
+
     private static class TableApiErrorTestItem extends 
ErrorTestItem<Expression> {
 
         TableApiErrorTestItem(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/IfThenElseFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/IfThenElseFunctionITCase.java
new file mode 100644
index 00000000000..a01aeac65e9
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/IfThenElseFunctionITCase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.lit;
+
+/** IT tests for {@link BuiltInFunctionDefinitions#IF}. */
+class IfThenElseFunctionITCase extends BuiltInFunctionTestBase {
+
+    @Override
+    Stream<TestSetSpec> getTestSetSpecs() {
+        return Stream.of(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.IF)
+                        .onFieldsWithData(2)
+                        .andDataTypes(DataTypes.INT())
+                        .testResult(
+                                Expressions.ifThenElse(
+                                        $("f0").isGreater(lit(0)), 
lit("GREATER"), lit("SMALLER")),
+                                "CASE WHEN f0 > 0 THEN 'GREATER' ELSE 
'SMALLER' END",
+                                "GREATER",
+                                DataTypes.CHAR(7).notNull())
+                        .testResult(
+                                Expressions.ifThenElse(
+                                        $("f0").isGreater(lit(0)),
+                                        lit("GREATER"),
+                                        Expressions.ifThenElse(
+                                                $("f0").isEqual(0), 
lit("EQUAL"), lit("SMALLER"))),
+                                "CASE WHEN f0 > 0 THEN 'GREATER' ELSE CASE 
WHEN f0 = 0 THEN 'EQUAL' ELSE 'SMALLER' END END",
+                                "GREATER",
+                                DataTypes.VARCHAR(7).notNull()));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
index cec9f237e68..34b00813d75 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
@@ -53,9 +53,8 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(INSERT, "C", 3)))
                         .testResult(
                                 source -> "SELECT JSON_OBJECTAGG(f0 VALUE f1) 
FROM " + source,
-                                source ->
-                                        source.select(
-                                                jsonObjectAgg(JsonOnNull.NULL, 
$("f0"), $("f1"))),
+                                TableApiAggSpec.select(
+                                        jsonObjectAgg(JsonOnNull.NULL, 
$("f0"), $("f1"))),
                                 ROW(VARCHAR(2000).notNull()),
                                 ROW(STRING().notNull()),
                                 
Collections.singletonList(Row.of("{\"A\":1,\"B\":null,\"C\":3}"))),
@@ -71,9 +70,8 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                 source ->
                                         "SELECT JSON_OBJECTAGG(f0 VALUE f1 
ABSENT ON NULL) FROM "
                                                 + source,
-                                source ->
-                                        source.select(
-                                                
jsonObjectAgg(JsonOnNull.ABSENT, $("f0"), $("f1"))),
+                                TableApiAggSpec.select(
+                                        jsonObjectAgg(JsonOnNull.ABSENT, 
$("f0"), $("f1"))),
                                 ROW(VARCHAR(2000).notNull()),
                                 ROW(STRING().notNull()),
                                 
Collections.singletonList(Row.of("{\"A\":1,\"C\":3}"))),
@@ -88,9 +86,8 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(DELETE, "B", 2)))
                         .testResult(
                                 source -> "SELECT JSON_OBJECTAGG(f0 VALUE f1) 
FROM " + source,
-                                source ->
-                                        source.select(
-                                                jsonObjectAgg(JsonOnNull.NULL, 
$("f0"), $("f1"))),
+                                TableApiAggSpec.select(
+                                        jsonObjectAgg(JsonOnNull.NULL, 
$("f0"), $("f1"))),
                                 ROW(VARCHAR(2000).notNull()),
                                 ROW(STRING().notNull()),
                                 
Collections.singletonList(Row.of("{\"A\":1,\"C\":3}"))),
@@ -108,12 +105,10 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                         "SELECT f0, JSON_OBJECTAGG(f1 VALUE 
f2) FROM "
                                                 + source
                                                 + " GROUP BY f0",
-                                source ->
-                                        source.groupBy($("f0"))
-                                                .select(
-                                                        $("f0"),
-                                                        jsonObjectAgg(
-                                                                
JsonOnNull.NULL, $("f1"), $("f2"))),
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f0")),
+                                        $("f0"),
+                                        jsonObjectAgg(JsonOnNull.NULL, 
$("f1"), $("f2"))),
                                 ROW(INT(), VARCHAR(2000).notNull()),
                                 ROW(INT(), STRING().notNull()),
                                 Arrays.asList(
@@ -131,10 +126,9 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                 source ->
                                         "SELECT max(f1), JSON_OBJECTAGG(f0 
VALUE f1) FROM "
                                                 + source,
-                                source ->
-                                        source.select(
-                                                $("f1").max(),
-                                                jsonObjectAgg(JsonOnNull.NULL, 
$("f0"), $("f1"))),
+                                TableApiAggSpec.select(
+                                        $("f1").max(),
+                                        jsonObjectAgg(JsonOnNull.NULL, 
$("f0"), $("f1"))),
                                 ROW(INT(), VARCHAR(2000).notNull()),
                                 ROW(INT(), STRING().notNull()),
                                 Collections.singletonList(
@@ -153,13 +147,11 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                         "SELECT f0, JSON_OBJECTAGG(f1 VALUE 
f2), max(f2) FROM "
                                                 + source
                                                 + " GROUP BY f0",
-                                source ->
-                                        source.groupBy($("f0"))
-                                                .select(
-                                                        $("f0"),
-                                                        jsonObjectAgg(
-                                                                
JsonOnNull.NULL, $("f1"), $("f2")),
-                                                        $("f2").max()),
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f0")),
+                                        $("f0"),
+                                        jsonObjectAgg(JsonOnNull.NULL, 
$("f1"), $("f2")),
+                                        $("f2").max()),
                                 ROW(INT(), VARCHAR(2000).notNull(), INT()),
                                 ROW(INT(), STRING().notNull(), INT()),
                                 Arrays.asList(
@@ -177,7 +169,7 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(INSERT, "C")))
                         .testResult(
                                 source -> "SELECT JSON_ARRAYAGG(f0) FROM " + 
source,
-                                source -> 
source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+                                
TableApiAggSpec.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
                                 ROW(VARCHAR(2000).notNull()),
                                 ROW(STRING().notNull()),
                                 
Collections.singletonList(Row.of("[\"A\",\"C\"]"))),
@@ -191,7 +183,7 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(INSERT, "C")))
                         .testResult(
                                 source -> "SELECT JSON_ARRAYAGG(f0 NULL ON 
NULL) FROM " + source,
-                                source -> 
source.select(jsonArrayAgg(JsonOnNull.NULL, $("f0"))),
+                                
TableApiAggSpec.select(jsonArrayAgg(JsonOnNull.NULL, $("f0"))),
                                 ROW(VARCHAR(2000).notNull()),
                                 ROW(STRING().notNull()),
                                 
Collections.singletonList(Row.of("[\"A\",null,\"C\"]"))),
@@ -206,7 +198,7 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(DELETE, 2)))
                         .testResult(
                                 source -> "SELECT JSON_ARRAYAGG(f0) FROM " + 
source,
-                                source -> 
source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+                                
TableApiAggSpec.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
                                 ROW(VARCHAR(2000).notNull()),
                                 ROW(STRING().notNull()),
                                 Collections.singletonList(Row.of("[1,3]"))),
@@ -220,10 +212,8 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                         Row.ofKind(INSERT, "C")))
                         .testResult(
                                 source -> "SELECT max(f0), JSON_ARRAYAGG(f0) 
FROM " + source,
-                                source ->
-                                        source.select(
-                                                $("f0").max(),
-                                                
jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+                                TableApiAggSpec.select(
+                                        $("f0").max(), 
jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
                                 ROW(STRING(), VARCHAR(2000).notNull()),
                                 ROW(STRING(), STRING().notNull()),
                                 Collections.singletonList(Row.of("C", 
"[\"A\",\"C\"]"))),
@@ -241,12 +231,11 @@ class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTestBase {
                                         "SELECT f0, max(f1), 
JSON_ARRAYAGG(f1)FROM "
                                                 + source
                                                 + " GROUP BY f0",
-                                source ->
-                                        source.groupBy($("f0"))
-                                                .select(
-                                                        $("f0"),
-                                                        $("f1").max(),
-                                                        
jsonArrayAgg(JsonOnNull.ABSENT, $("f1"))),
+                                TableApiAggSpec.groupBySelect(
+                                        Collections.singletonList($("f0")),
+                                        $("f0"),
+                                        $("f1").max(),
+                                        jsonArrayAgg(JsonOnNull.ABSENT, 
$("f1"))),
                                 ROW(INT(), STRING(), VARCHAR(2000).notNull()),
                                 ROW(INT(), STRING(), STRING().notNull()),
                                 Arrays.asList(

Reply via email to