Re:Re: Flink Sql 1.13 UDF ERROR
Hi, Jingsong. 最新的类型推导相对于之前版本的类型推导更加严格,对schema的非空限制校验也更加细致。 在之前提到的例子中使用基本类型做UDF参数, 表示跟UDF中参数相关的列必须非空,而在创建视图时,每个类型默认的非空限制为false,因此出现了之前描述的问题。 祝好。 Best Roc. 在 2021-06-29 11:02:55,"Jingsong Li" 写道: >Hi, > >你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题 > >Best, >Jingsong > >On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal wrote: > >> >> >> Hi, All. >> >> >> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: >> >> >> 版本: 1.13.1 >> 运行模式: IDE-application >> --- >> about udf define... >> >> >> public static class UDFAggregateFunction extends >> AggregateFunction { >> >> >> //返回最终结果 >> @Override >> public Double getValue(AccumulatorBean acc) { >> return acc.totalPrice / acc.totalNum; >> } >> >> >> //构建保存中间结果的对象 >> @Override >> public AccumulatorBean createAccumulator() { >> return new AccumulatorBean(); >> } >> >> >> //减去要撤回的值 >> public void retract(AccumulatorBean acc, double price, long num) { >> acc.totalPrice -= price * num; >> acc.totalNum -= num; >> } >> >> >> //从每个分区把数据取出来然后合并 >> public void merge(AccumulatorBean acc, Iterable >> it) { >> >> >> Iterator iter = it.iterator(); >> while (iter.hasNext()) { >> AccumulatorBean a = iter.next(); >> this.accumulate(acc, a.totalPrice, a.totalNum); >> } >> } >> >> >> //重置内存中值时调用 >> public void resetAccumulator(AccumulatorBean acc) { >> acc.totalNum = 0; >> acc.totalPrice = 0; >> } >> >> >> //和传入数据进行计算的逻辑 >> public void accumulate(AccumulatorBean acc, double price, long >> num) { >> acc.totalPrice += price * num; >> acc.totalNum += num; >> } >> } >> >> >> >> >> About main calling >> //TODO 流批一体的 Table API >> TableEnvironment tableEnvironment = >> TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); >> List dataList = new ArrayList<>(); >> dataList.add(Row.of("张三", "可乐", 20.0D, 4L)); >> dataList.add(Row.of("张三", "果汁", 10.0D, 4L)); >> dataList.add(Row.of("李四", "咖啡", 10.0D, 2L)); >> Table table = tableEnvironment.fromValues(DataTypes.ROW( >> DataTypes.FIELD("user", DataTypes.STRING()), >> DataTypes.FIELD("name", DataTypes.STRING()), >> DataTypes.FIELD("price", DataTypes.DOUBLE()), >> DataTypes.FIELD("num", DataTypes.BIGINT()) >> ), >> dataList); >> tableEnvironment.createTemporaryView("orders", table); >> >> >> tableEnvironment.createTemporaryFunction("c_agg", new >> UDFAggregateFunction()); >> >> >> tableEnvironment.executeSql("select user, c_agg(price, num) as >> udf_field from orders group by user").print(); >> >> >> >> >> >> >> >> 异常堆栈- >> >> >> >> >> default_catalog.default_database.c_agg(DOUBLE, BIGINT) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) >> at >> com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139) >> Caused by: org.apache.flink.table.api.ValidationException: Invalid >> function call: >> default_catalog.default_database.c_agg(DOUBLE, BIGINT) >> at >> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194) >> at >> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89) >> at >> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679) >> at >> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444) >> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330) >> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) >> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) >> at >>
Re: Flink Sql 1.13 UDF ERROR
Hi, 你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题 Best, Jingsong On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal wrote: > > > Hi, All. > > > 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: > > > 版本: 1.13.1 > 运行模式: IDE-application > --- > about udf define... > > > public static class UDFAggregateFunction extends > AggregateFunction { > > > //返回最终结果 > @Override > public Double getValue(AccumulatorBean acc) { > return acc.totalPrice / acc.totalNum; > } > > > //构建保存中间结果的对象 > @Override > public AccumulatorBean createAccumulator() { > return new AccumulatorBean(); > } > > > //减去要撤回的值 > public void retract(AccumulatorBean acc, double price, long num) { > acc.totalPrice -= price * num; > acc.totalNum -= num; > } > > > //从每个分区把数据取出来然后合并 > public void merge(AccumulatorBean acc, Iterable > it) { > > > Iterator iter = it.iterator(); > while (iter.hasNext()) { > AccumulatorBean a = iter.next(); > this.accumulate(acc, a.totalPrice, a.totalNum); > } > } > > > //重置内存中值时调用 > public void resetAccumulator(AccumulatorBean acc) { > acc.totalNum = 0; > acc.totalPrice = 0; > } > > > //和传入数据进行计算的逻辑 > public void accumulate(AccumulatorBean acc, double price, long > num) { > acc.totalPrice += price * num; > acc.totalNum += num; > } > } > > > > > About main calling > //TODO 流批一体的 Table API > TableEnvironment tableEnvironment = > TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); > List dataList = new ArrayList<>(); > dataList.add(Row.of("张三", "可乐", 20.0D, 4L)); > dataList.add(Row.of("张三", "果汁", 10.0D, 4L)); > dataList.add(Row.of("李四", "咖啡", 10.0D, 2L)); > Table table = tableEnvironment.fromValues(DataTypes.ROW( > DataTypes.FIELD("user", DataTypes.STRING()), > DataTypes.FIELD("name", DataTypes.STRING()), > DataTypes.FIELD("price", DataTypes.DOUBLE()), > DataTypes.FIELD("num", DataTypes.BIGINT()) > ), > dataList); > tableEnvironment.createTemporaryView("orders", table); > > > tableEnvironment.createTemporaryFunction("c_agg", new > UDFAggregateFunction()); > > > tableEnvironment.executeSql("select user, c_agg(price, num) as > udf_field from orders group by user").print(); > > > > > > > > 异常堆栈- > > > > > default_catalog.default_database.c_agg(DOUBLE, BIGINT) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > at > com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139) > Caused by: org.apache.flink.table.api.ValidationException: Invalid > function call: > default_catalog.default_database.c_agg(DOUBLE, BIGINT) > at > org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194) > at > org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89) > at > org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679) > at > org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) > at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) > at >
Flink Sql 1.13 UDF ERROR
Hi, All. 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: 版本: 1.13.1 运行模式: IDE-application --- about udf define... public static class UDFAggregateFunction extends AggregateFunction { //返回最终结果 @Override public Double getValue(AccumulatorBean acc) { return acc.totalPrice / acc.totalNum; } //构建保存中间结果的对象 @Override public AccumulatorBean createAccumulator() { return new AccumulatorBean(); } //减去要撤回的值 public void retract(AccumulatorBean acc, double price, long num) { acc.totalPrice -= price * num; acc.totalNum -= num; } //从每个分区把数据取出来然后合并 public void merge(AccumulatorBean acc, Iterable it) { Iterator iter = it.iterator(); while (iter.hasNext()) { AccumulatorBean a = iter.next(); this.accumulate(acc, a.totalPrice, a.totalNum); } } //重置内存中值时调用 public void resetAccumulator(AccumulatorBean acc) { acc.totalNum = 0; acc.totalPrice = 0; } //和传入数据进行计算的逻辑 public void accumulate(AccumulatorBean acc, double price, long num) { acc.totalPrice += price * num; acc.totalNum += num; } } About main calling //TODO 流批一体的 Table API TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); List dataList = new ArrayList<>(); dataList.add(Row.of("张三", "可乐", 20.0D, 4L)); dataList.add(Row.of("张三", "果汁", 10.0D, 4L)); dataList.add(Row.of("李四", "咖啡", 10.0D, 2L)); Table table = tableEnvironment.fromValues(DataTypes.ROW( DataTypes.FIELD("user", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("price", DataTypes.DOUBLE()), DataTypes.FIELD("num", DataTypes.BIGINT()) ), dataList); tableEnvironment.createTemporaryView("orders", table); tableEnvironment.createTemporaryFunction("c_agg", new UDFAggregateFunction()); tableEnvironment.executeSql("select user, c_agg(price, num) as udf_field from orders group by user").print(); 异常堆栈- default_catalog.default_database.c_agg(DOUBLE, BIGINT) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139) Caused by: org.apache.flink.table.api.ValidationException: Invalid function call: default_catalog.default_database.c_agg(DOUBLE, BIGINT) at org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89) at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679) at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at
Flink Sql 1.13 UDF ERROR
Hi, All. 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: 版本: 1.13.1 运行模式: IDE-application --- about udf define... public static class UDFAggregateFunction extends AggregateFunction { //返回最终结果 @Override public Double getValue(AccumulatorBean acc) { return acc.totalPrice / acc.totalNum; } //构建保存中间结果的对象 @Override public AccumulatorBean createAccumulator() { return new AccumulatorBean(); } //减去要撤回的值 public void retract(AccumulatorBean acc, double price, long num) { acc.totalPrice -= price * num; acc.totalNum -= num; } //从每个分区把数据取出来然后合并 public void merge(AccumulatorBean acc, Iterable it) { Iterator iter = it.iterator(); while (iter.hasNext()) { AccumulatorBean a = iter.next(); this.accumulate(acc, a.totalPrice, a.totalNum); } } //重置内存中值时调用 public void resetAccumulator(AccumulatorBean acc) { acc.totalNum = 0; acc.totalPrice = 0; } //和传入数据进行计算的逻辑 public void accumulate(AccumulatorBean acc, double price, long num) { acc.totalPrice += price * num; acc.totalNum += num; } } About main calling //TODO 流批一体的 Table API TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); List dataList = new ArrayList<>(); dataList.add(Row.of("张三", "可乐", 20.0D, 4L)); dataList.add(Row.of("张三", "果汁", 10.0D, 4L)); dataList.add(Row.of("李四", "咖啡", 10.0D, 2L)); Table table = tableEnvironment.fromValues(DataTypes.ROW( DataTypes.FIELD("user", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("price", DataTypes.DOUBLE()), DataTypes.FIELD("num", DataTypes.BIGINT()) ), dataList); tableEnvironment.createTemporaryView("orders", table); tableEnvironment.createTemporaryFunction("c_agg", new UDFAggregateFunction()); tableEnvironment.executeSql("select user, c_agg(price, num) as udf_field from orders group by user").print(); 异常堆栈- default_catalog.default_database.c_agg(DOUBLE, BIGINT) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139) Caused by: org.apache.flink.table.api.ValidationException: Invalid function call: default_catalog.default_database.c_agg(DOUBLE, BIGINT) at org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194) at org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89) at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679) at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726) at