This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 87d80878bda035d4697b65f75a6261a6054b60a9 Author: Timo Walther <twal...@apache.org> AuthorDate: Mon May 25 16:18:46 2020 +0200 [hotfix][table-common] Add constraint argument type strategy --- .../table/types/inference/InputTypeStrategies.java | 11 +++ .../strategies/ConstraintArgumentTypeStrategy.java | 88 ++++++++++++++++++++++ .../types/inference/InputTypeStrategiesTest.java | 28 ++++++- 3 files changed, 126 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java index db589db..615f392 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java @@ -24,6 +24,7 @@ import org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy import org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy; import org.apache.flink.table.types.inference.strategies.ArrayInputTypeStrategy; import org.apache.flink.table.types.inference.strategies.ComparableTypeStrategy; +import org.apache.flink.table.types.inference.strategies.ConstraintArgumentTypeStrategy; import org.apache.flink.table.types.inference.strategies.ExplicitArgumentTypeStrategy; import org.apache.flink.table.types.inference.strategies.FamilyArgumentTypeStrategy; import org.apache.flink.table.types.inference.strategies.LiteralArgumentTypeStrategy; @@ -41,6 +42,7 @@ import org.apache.flink.table.types.logical.StructuredType.StructuredComparision import java.util.Arrays; import java.util.List; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -218,6 +220,15 @@ public final class InputTypeStrategies { } /** + * Strategy for an argument that must fulfill a given constraint. + */ + public static ConstraintArgumentTypeStrategy constraint( + String constraintMessage, + Function<List<DataType>, Boolean> evaluator) { + return new ConstraintArgumentTypeStrategy(constraintMessage, evaluator); + } + + /** * Strategy for a conjunction of multiple {@link ArgumentTypeStrategy}s into one like * {@code f(NUMERIC && LITERAL)}. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java new file mode 100644 index 0000000..4457104 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentTypeStrategy; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.Signature.Argument; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +/** + * Strategy for an argument that must fulfill a given constraint. + */ +@Internal +public final class ConstraintArgumentTypeStrategy implements ArgumentTypeStrategy { + + private final String constraintMessage; + + private final Function<List<DataType>, Boolean> evaluator; + + public ConstraintArgumentTypeStrategy( + String constraintMessage, + Function<List<DataType>, Boolean> evaluator) { + this.constraintMessage = constraintMessage; + this.evaluator = evaluator; + } + + @Override + public Optional<DataType> inferArgumentType(CallContext callContext, int argumentPos, boolean throwOnFailure) { + final List<DataType> actualDataTypes = callContext.getArgumentDataTypes(); + + // type fulfills constraint + if (evaluator.apply(actualDataTypes)) { + return Optional.of(actualDataTypes.get(argumentPos)); + } + + if (throwOnFailure) { + throw callContext.newValidationError( + constraintMessage, + actualDataTypes.toArray()); + } + return Optional.empty(); + } + + @Override + public Argument getExpectedArgument(FunctionDefinition functionDefinition, int argumentPos) { + return Argument.of("<CONSTRAINT>"); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ConstraintArgumentTypeStrategy that = (ConstraintArgumentTypeStrategy) o; + return constraintMessage.equals(that.constraintMessage) && evaluator.equals(that.evaluator); + } + + @Override + public int hashCode() { + return Objects.hash(constraintMessage, evaluator); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java index 38b1888..15f0bb9 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java @@ -34,6 +34,7 @@ import static org.apache.flink.table.types.inference.InputTypeStrategies.LITERAL import static org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL; import static org.apache.flink.table.types.inference.InputTypeStrategies.WILDCARD; import static org.apache.flink.table.types.inference.InputTypeStrategies.and; +import static org.apache.flink.table.types.inference.InputTypeStrategies.constraint; import static org.apache.flink.table.types.inference.InputTypeStrategies.explicit; import static org.apache.flink.table.types.inference.InputTypeStrategies.explicitSequence; import static org.apache.flink.table.types.inference.InputTypeStrategies.logical; @@ -540,7 +541,32 @@ public class InputTypeStrategiesTest extends InputTypeStrategiesTestBase { .calledWithArgumentTypes(DataTypes.FLOAT()) .expectSignature("f(<EXACT_NUMERIC>)") .expectErrorMessage( - "Unsupported argument type. Expected type of family 'EXACT_NUMERIC' but actual type was 'FLOAT'.") + "Unsupported argument type. Expected type of family 'EXACT_NUMERIC' but actual type was 'FLOAT'."), + + TestSpec + .forStrategy( + "Constraint argument type strategy", + sequence( + and( + explicit(DataTypes.BOOLEAN()), + constraint( + "%s must be nullable.", + args -> args.get(0).getLogicalType().isNullable())))) + .calledWithArgumentTypes(DataTypes.BOOLEAN()) + .expectSignature("f([BOOLEAN & <CONSTRAINT>])") + .expectArgumentTypes(DataTypes.BOOLEAN()), + + TestSpec + .forStrategy( + "Constraint argument type strategy invalid", + sequence( + and( + explicit(DataTypes.BOOLEAN().notNull()), + constraint( + "My constraint says %s must be nullable.", + args -> args.get(0).getLogicalType().isNullable())))) + .calledWithArgumentTypes(DataTypes.BOOLEAN().notNull()) + .expectErrorMessage("My constraint says BOOLEAN NOT NULL must be nullable.") ); } }