我发现换成流模式下,这个问题就解了。批的模式下,应该不需要定义watermark。这里看样子是有一个 bug 存在的,建议去社区建一个jira。

yanyunpeng <[email protected]> 于2021年8月4日周三 下午5:42写道:

> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Ordering must be defined on a time attribute.
> at
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
> at
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
> at
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
> at java.util.function.Function.lambda$andThen$1(Function.java:88)
> at
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
> at
> org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
> at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44)
>
>
>
>
> Flink 版本1.13.0
>
>
> 在 2021年8月4日 17:37,Shengkai Fang<[email protected]> 写道:
>
>
> 能发一下具体的异常栈吗?是哪个版本? yanyunpeng <[email protected]> 于2021年8月4日周三
> 下午2:47写道: > Table table = tableEnv > .from("t_yyp_test") >
> .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) >
> .orderBy($("f_time")) > .preceding("unbounded_range") >
> .following(CURRENT_RANGE) > .as("w")) > .select($("f_value"), > $("f_h"), >
> $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), >
> $("f_value").varPop().over($("w")), >
> $("f_value").stddevPop().over($("w"))); > 也是一样的 > Exception in thread
> "main" org.apache.flink.table.api.ValidationException: > Ordering must be
> defined on a time attribute. > > 在 2021年8月4日 14:34,Caizhi Weng<
> [email protected]> 写道: > > > Hi! order by 的字段是 f_time_bak,但是 watermark
> 的字段是 f_time,这两个不一致。 yanyunpeng < > [email protected]>
> 于2021年8月4日周三 下午2:30写道: > 代码如下: > > EnvironmentSettings bbSettings = > >
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> > > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > >
> tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > "
> > f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value >
> DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n,
> > " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS
> > f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' =
> > 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' =
> > '123',\n" + > " 'password' = '123'\n" + > ")"); > >
> tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > >
> Table table = tableEnv > .from("t_yyp_test") > >
> .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > >
> .orderBy($("f_time_bak")) > .preceding("unbounded_range") > >
> .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > >
> $("f_j"), > $("f_value").avg().over($("w")), > >
> $("f_value").varPop().over($("w")), > >
> $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime >
> 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" >
> org.apache.flink.table.api.ValidationException: > Ordering must be defined
> > on a time attribute. > > > 请问这是什么原因

回复