This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new dbfe5da2a2b [FLINK-24586][table-planner] JSON_VALUE should return STRING instead of VARCHAR(2000) dbfe5da2a2b is described below commit dbfe5da2a2bc4564a1cf715a918299d107758b6c Author: tsreaper <tsreape...@gmail.com> AuthorDate: Wed Mar 9 16:25:58 2022 +0800 [FLINK-24586][table-planner] JSON_VALUE should return STRING instead of VARCHAR(2000) This closes #19014. --- .../functions/sql/FlinkSqlOperatorTable.java | 2 +- .../functions/sql/SqlJsonValueFunctionWrapper.java | 83 ++++++++++++++++++++++ .../planner/functions/JsonFunctionsITCase.java | 9 ++- 3 files changed, 88 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index cd949961058..2b18ffee5e3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -1140,7 +1140,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { // JSON FUNCTIONS public static final SqlFunction JSON_EXISTS = SqlStdOperatorTable.JSON_EXISTS; - public static final SqlFunction JSON_VALUE = SqlStdOperatorTable.JSON_VALUE; + public static final SqlFunction JSON_VALUE = new SqlJsonValueFunctionWrapper("JSON_VALUE"); public static final SqlFunction JSON_QUERY = new SqlJsonQueryFunctionWrapper(); public static final SqlFunction JSON_OBJECT = new SqlJsonObjectFunctionWrapper(); public static final SqlAggFunction JSON_OBJECTAGG_NULL_ON_NULL = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlJsonValueFunctionWrapper.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlJsonValueFunctionWrapper.java new file mode 100644 index 00000000000..b28ef4786e4 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlJsonValueFunctionWrapper.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlJsonValueReturning; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.fun.SqlJsonValueFunction; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeTransforms; + +import java.util.Optional; + +import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.VARCHAR_FORCE_NULLABLE; + +/** + * This class is a wrapper class for the {@link SqlJsonValueFunction} but using the {@code + * VARCHAR_FORCE_NULLABLE} return type inference by default. It also supports specifying return type + * with the RETURNING keyword just like the original {@link SqlJsonValueFunction}. + */ +class SqlJsonValueFunctionWrapper extends SqlJsonValueFunction { + + private final SqlReturnTypeInference returnTypeInference; + + SqlJsonValueFunctionWrapper(String name) { + super(name); + this.returnTypeInference = + ReturnTypes.cascade( + SqlJsonValueFunctionWrapper::explicitTypeSpec, + SqlTypeTransforms.FORCE_NULLABLE) + .orElse(VARCHAR_FORCE_NULLABLE); + } + + @Override + public RelDataType inferReturnType(SqlOperatorBinding opBinding) { + RelDataType returnType = returnTypeInference.inferReturnType(opBinding); + if (returnType == null) { + throw new IllegalArgumentException( + "Cannot infer return type for " + + opBinding.getOperator() + + "; operand types: " + + opBinding.collectOperandTypes()); + } + return returnType; + } + + @Override + public SqlReturnTypeInference getReturnTypeInference() { + return returnTypeInference; + } + + /** + * Copied and modified from the original {@link SqlJsonValueFunction}. + * + * <p>Changes: Instead of returning {@link Optional} this method returns null directly. + */ + private static RelDataType explicitTypeSpec(SqlOperatorBinding opBinding) { + if (opBinding.getOperandCount() > 2 + && opBinding.isOperandLiteral(2, false) + && opBinding.getOperandLiteralValue(2, Object.class) + instanceof SqlJsonValueReturning) { + return opBinding.getOperandType(3); + } + return null; + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java index e01c0f74df8..9fb64e7e58b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java @@ -57,7 +57,6 @@ import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.api.DataTypes.TIMESTAMP; import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE; import static org.apache.flink.table.api.DataTypes.VARBINARY; -import static org.apache.flink.table.api.DataTypes.VARCHAR; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; import static org.apache.flink.table.api.Expressions.jsonArray; @@ -183,7 +182,7 @@ public class JsonFunctionsITCase extends BuiltInFunctionTestBase { "JSON_VALUE(CAST(NULL AS STRING), 'lax $')", null, STRING(), - VARCHAR(2000)), + STRING()), // RETURNING + Supported Data Types resultSpec( @@ -191,7 +190,7 @@ public class JsonFunctionsITCase extends BuiltInFunctionTestBase { "JSON_VALUE(f0, '$.type')", "account", STRING(), - VARCHAR(2000)), + STRING()), resultSpec( $("f0").jsonValue("$.activated", BOOLEAN()), "JSON_VALUE(f0, '$.activated' RETURNING BOOLEAN)", @@ -220,7 +219,7 @@ public class JsonFunctionsITCase extends BuiltInFunctionTestBase { "JSON_VALUE(f0, 'lax $.invalid' NULL ON EMPTY ERROR ON ERROR)", null, STRING(), - VARCHAR(2000)), + STRING()), resultSpec( $("f0").jsonValue( "lax $.invalid", @@ -243,7 +242,7 @@ public class JsonFunctionsITCase extends BuiltInFunctionTestBase { "JSON_VALUE(f0, 'strict $.invalid' ERROR ON EMPTY NULL ON ERROR)", null, STRING(), - VARCHAR(2000)), + STRING()), resultSpec( $("f0").jsonValue( "strict $.invalid",