This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 87d80878bda035d4697b65f75a6261a6054b60a9
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon May 25 16:18:46 2020 +0200

    [hotfix][table-common] Add constraint argument type strategy
---
 .../table/types/inference/InputTypeStrategies.java | 11 +++
 .../strategies/ConstraintArgumentTypeStrategy.java | 88 ++++++++++++++++++++++
 .../types/inference/InputTypeStrategiesTest.java   | 28 ++++++-
 3 files changed, 126 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index db589db..615f392 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy
 import 
org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.ArrayInputTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.ComparableTypeStrategy;
+import 
org.apache.flink.table.types.inference.strategies.ConstraintArgumentTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.ExplicitArgumentTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.FamilyArgumentTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.LiteralArgumentTypeStrategy;
@@ -41,6 +42,7 @@ import 
org.apache.flink.table.types.logical.StructuredType.StructuredComparision
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -218,6 +220,15 @@ public final class InputTypeStrategies {
        }
 
        /**
+        * Strategy for an argument that must fulfill a given constraint.
+        */
+       public static ConstraintArgumentTypeStrategy constraint(
+                       String constraintMessage,
+                       Function<List<DataType>, Boolean> evaluator) {
+               return new ConstraintArgumentTypeStrategy(constraintMessage, 
evaluator);
+       }
+
+       /**
         * Strategy for a conjunction of multiple {@link ArgumentTypeStrategy}s 
into one like
         * {@code f(NUMERIC && LITERAL)}.
         *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java
new file mode 100644
index 0000000..4457104
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ConstraintArgumentTypeStrategy.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature.Argument;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Strategy for an argument that must fulfill a given constraint.
+ */
+@Internal
+public final class ConstraintArgumentTypeStrategy implements 
ArgumentTypeStrategy {
+
+       private final String constraintMessage;
+
+       private final Function<List<DataType>, Boolean> evaluator;
+
+       public ConstraintArgumentTypeStrategy(
+                       String constraintMessage,
+                       Function<List<DataType>, Boolean> evaluator) {
+               this.constraintMessage = constraintMessage;
+               this.evaluator = evaluator;
+       }
+
+       @Override
+       public Optional<DataType> inferArgumentType(CallContext callContext, 
int argumentPos, boolean throwOnFailure) {
+               final List<DataType> actualDataTypes = 
callContext.getArgumentDataTypes();
+
+               // type fulfills constraint
+               if (evaluator.apply(actualDataTypes)) {
+                       return Optional.of(actualDataTypes.get(argumentPos));
+               }
+
+               if (throwOnFailure) {
+                       throw callContext.newValidationError(
+                               constraintMessage,
+                               actualDataTypes.toArray());
+               }
+               return Optional.empty();
+       }
+
+       @Override
+       public Argument getExpectedArgument(FunctionDefinition 
functionDefinition, int argumentPos) {
+               return Argument.of("<CONSTRAINT>");
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               ConstraintArgumentTypeStrategy that = 
(ConstraintArgumentTypeStrategy) o;
+               return constraintMessage.equals(that.constraintMessage) && 
evaluator.equals(that.evaluator);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(constraintMessage, evaluator);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
index 38b1888..15f0bb9 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
@@ -34,6 +34,7 @@ import static 
org.apache.flink.table.types.inference.InputTypeStrategies.LITERAL
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.WILDCARD;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
+import static 
org.apache.flink.table.types.inference.InputTypeStrategies.constraint;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.explicit;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.explicitSequence;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.logical;
@@ -540,7 +541,32 @@ public class InputTypeStrategiesTest extends 
InputTypeStrategiesTestBase {
                                .calledWithArgumentTypes(DataTypes.FLOAT())
                                .expectSignature("f(<EXACT_NUMERIC>)")
                                .expectErrorMessage(
-                                       "Unsupported argument type. Expected 
type of family 'EXACT_NUMERIC' but actual type was 'FLOAT'.")
+                                       "Unsupported argument type. Expected 
type of family 'EXACT_NUMERIC' but actual type was 'FLOAT'."),
+
+                       TestSpec
+                               .forStrategy(
+                                       "Constraint argument type strategy",
+                                       sequence(
+                                               and(
+                                                       
explicit(DataTypes.BOOLEAN()),
+                                                       constraint(
+                                                               "%s must be 
nullable.",
+                                                               args -> 
args.get(0).getLogicalType().isNullable()))))
+                               .calledWithArgumentTypes(DataTypes.BOOLEAN())
+                               .expectSignature("f([BOOLEAN & <CONSTRAINT>])")
+                               .expectArgumentTypes(DataTypes.BOOLEAN()),
+
+                       TestSpec
+                               .forStrategy(
+                                       "Constraint argument type strategy 
invalid",
+                                       sequence(
+                                               and(
+                                                       
explicit(DataTypes.BOOLEAN().notNull()),
+                                                       constraint(
+                                                               "My constraint 
says %s must be nullable.",
+                                                               args -> 
args.get(0).getLogicalType().isNullable()))))
+                               
.calledWithArgumentTypes(DataTypes.BOOLEAN().notNull())
+                               .expectErrorMessage("My constraint says BOOLEAN 
NOT NULL must be nullable.")
                );
        }
 }

Reply via email to