Re:Re: Flink Sql 1.13 UDF ERROR

2021-07-11 文章 Roc Marshal
Hi, Jingsong.


 最新的类型推导相对于之前版本的类型推导更加严格,对schema的非空限制校验也更加细致。
 在之前提到的例子中使用基本类型做UDF参数, 
表示跟UDF中参数相关的列必须非空,而在创建视图时,每个类型默认的非空限制为false,因此出现了之前描述的问题。







祝好。

Best Roc.








在 2021-06-29 11:02:55,"Jingsong Li"  写道:
>Hi,
>
>你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题
>
>Best,
>Jingsong
>
>On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal  wrote:
>
>>
>>
>> Hi, All.
>>
>>
>> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
>>
>>
>> 版本: 1.13.1
>> 运行模式: IDE-application
>> ---
>> about udf define...
>>
>>
>> public static class UDFAggregateFunction extends
>> AggregateFunction {
>>
>>
>> //返回最终结果
>> @Override
>> public Double getValue(AccumulatorBean acc) {
>> return acc.totalPrice / acc.totalNum;
>> }
>>
>>
>> //构建保存中间结果的对象
>> @Override
>> public AccumulatorBean createAccumulator() {
>> return new AccumulatorBean();
>> }
>>
>>
>> //减去要撤回的值
>> public void retract(AccumulatorBean acc, double price, long num) {
>> acc.totalPrice -= price * num;
>> acc.totalNum -= num;
>> }
>>
>>
>> //从每个分区把数据取出来然后合并
>> public void merge(AccumulatorBean acc, Iterable
>> it) {
>>
>>
>> Iterator iter = it.iterator();
>> while (iter.hasNext()) {
>> AccumulatorBean a = iter.next();
>> this.accumulate(acc, a.totalPrice, a.totalNum);
>> }
>> }
>>
>>
>> //重置内存中值时调用
>> public void resetAccumulator(AccumulatorBean acc) {
>> acc.totalNum = 0;
>> acc.totalPrice = 0;
>> }
>>
>>
>> //和传入数据进行计算的逻辑
>> public void accumulate(AccumulatorBean acc, double price, long
>> num) {
>> acc.totalPrice += price * num;
>> acc.totalNum += num;
>> }
>> }
>>
>>
>>
>> 
>> About main calling
>> //TODO 流批一体的 Table API
>> TableEnvironment tableEnvironment =
>> TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
>> List dataList = new ArrayList<>();
>> dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
>> dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
>> dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
>> Table table = tableEnvironment.fromValues(DataTypes.ROW(
>> DataTypes.FIELD("user", DataTypes.STRING()),
>> DataTypes.FIELD("name", DataTypes.STRING()),
>> DataTypes.FIELD("price", DataTypes.DOUBLE()),
>> DataTypes.FIELD("num", DataTypes.BIGINT())
>> ),
>> dataList);
>> tableEnvironment.createTemporaryView("orders", table);
>>
>>
>> tableEnvironment.createTemporaryFunction("c_agg", new
>> UDFAggregateFunction());
>>
>>
>> tableEnvironment.executeSql("select user, c_agg(price, num) as
>> udf_field from orders group by user").print();
>>
>>
>>
>>
>>
>>
>>
>> 异常堆栈-
>>
>>
>>
>>
>> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
>> at
>> com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
>> Caused by: org.apache.flink.table.api.ValidationException: Invalid
>> function call:
>> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
>> at
>> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
>> at
>> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
>> at
>> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
>> at
>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
>> at
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>> at
>> 

Re: Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Jingsong Li
Hi,

你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题

Best,
Jingsong

On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal  wrote:

>
>
> Hi, All.
>
>
> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
>
>
> 版本: 1.13.1
> 运行模式: IDE-application
> ---
> about udf define...
>
>
> public static class UDFAggregateFunction extends
> AggregateFunction {
>
>
> //返回最终结果
> @Override
> public Double getValue(AccumulatorBean acc) {
> return acc.totalPrice / acc.totalNum;
> }
>
>
> //构建保存中间结果的对象
> @Override
> public AccumulatorBean createAccumulator() {
> return new AccumulatorBean();
> }
>
>
> //减去要撤回的值
> public void retract(AccumulatorBean acc, double price, long num) {
> acc.totalPrice -= price * num;
> acc.totalNum -= num;
> }
>
>
> //从每个分区把数据取出来然后合并
> public void merge(AccumulatorBean acc, Iterable
> it) {
>
>
> Iterator iter = it.iterator();
> while (iter.hasNext()) {
> AccumulatorBean a = iter.next();
> this.accumulate(acc, a.totalPrice, a.totalNum);
> }
> }
>
>
> //重置内存中值时调用
> public void resetAccumulator(AccumulatorBean acc) {
> acc.totalNum = 0;
> acc.totalPrice = 0;
> }
>
>
> //和传入数据进行计算的逻辑
> public void accumulate(AccumulatorBean acc, double price, long
> num) {
> acc.totalPrice += price * num;
> acc.totalNum += num;
> }
> }
>
>
>
> 
> About main calling
> //TODO 流批一体的 Table API
> TableEnvironment tableEnvironment =
> TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
> List dataList = new ArrayList<>();
> dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
> dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
> dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
> Table table = tableEnvironment.fromValues(DataTypes.ROW(
> DataTypes.FIELD("user", DataTypes.STRING()),
> DataTypes.FIELD("name", DataTypes.STRING()),
> DataTypes.FIELD("price", DataTypes.DOUBLE()),
> DataTypes.FIELD("num", DataTypes.BIGINT())
> ),
> dataList);
> tableEnvironment.createTemporaryView("orders", table);
>
>
> tableEnvironment.createTemporaryFunction("c_agg", new
> UDFAggregateFunction());
>
>
> tableEnvironment.executeSql("select user, c_agg(price, num) as
> udf_field from orders group by user").print();
>
>
>
>
>
>
>
> 异常堆栈-
>
>
>
>
> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> at
> com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> function call:
> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> at
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
> at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
> at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
> at
> 

Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Roc Marshal


Hi, All.


请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:


版本: 1.13.1
运行模式: IDE-application
---
about udf define...


public static class UDFAggregateFunction extends AggregateFunction {


//返回最终结果
@Override
public Double getValue(AccumulatorBean acc) {
return acc.totalPrice / acc.totalNum;
}


//构建保存中间结果的对象
@Override
public AccumulatorBean createAccumulator() {
return new AccumulatorBean();
}


//减去要撤回的值
public void retract(AccumulatorBean acc, double price, long num) {
acc.totalPrice -= price * num;
acc.totalNum -= num;
}


//从每个分区把数据取出来然后合并
public void merge(AccumulatorBean acc, Iterable it) {


Iterator iter = it.iterator();
while (iter.hasNext()) {
AccumulatorBean a = iter.next();
this.accumulate(acc, a.totalPrice, a.totalNum);
}
}


//重置内存中值时调用
public void resetAccumulator(AccumulatorBean acc) {
acc.totalNum = 0;
acc.totalPrice = 0;
}


//和传入数据进行计算的逻辑
public void accumulate(AccumulatorBean acc, double price, long num) {
acc.totalPrice += price * num;
acc.totalNum += num;
}
}



About main calling
//TODO 流批一体的 Table API
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
List dataList = new ArrayList<>();
dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
Table table = tableEnvironment.fromValues(DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.DOUBLE()),
DataTypes.FIELD("num", DataTypes.BIGINT())
),
dataList);
tableEnvironment.createTemporaryView("orders", table);


tableEnvironment.createTemporaryFunction("c_agg", new 
UDFAggregateFunction());


tableEnvironment.executeSql("select user, c_agg(price, num) as 
udf_field from orders group by user").print();






异常堆栈-




default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at 

Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Roc Marshal


Hi, All.


请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:


版本: 1.13.1
运行模式: IDE-application
---
about udf define...


public static class UDFAggregateFunction extends AggregateFunction {


//返回最终结果
@Override
public Double getValue(AccumulatorBean acc) {
return acc.totalPrice / acc.totalNum;
}


//构建保存中间结果的对象
@Override
public AccumulatorBean createAccumulator() {
return new AccumulatorBean();
}


//减去要撤回的值
public void retract(AccumulatorBean acc, double price, long num) {
acc.totalPrice -= price * num;
acc.totalNum -= num;
}


//从每个分区把数据取出来然后合并
public void merge(AccumulatorBean acc, Iterable it) {


Iterator iter = it.iterator();
while (iter.hasNext()) {
AccumulatorBean a = iter.next();
this.accumulate(acc, a.totalPrice, a.totalNum);
}
}


//重置内存中值时调用
public void resetAccumulator(AccumulatorBean acc) {
acc.totalNum = 0;
acc.totalPrice = 0;
}


//和传入数据进行计算的逻辑
public void accumulate(AccumulatorBean acc, double price, long num) {
acc.totalPrice += price * num;
acc.totalNum += num;
}
}



About main calling
//TODO 流批一体的 Table API
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
List dataList = new ArrayList<>();
dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
Table table = tableEnvironment.fromValues(DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.DOUBLE()),
DataTypes.FIELD("num", DataTypes.BIGINT())
),
dataList);
tableEnvironment.createTemporaryView("orders", table);


tableEnvironment.createTemporaryFunction("c_agg", new 
UDFAggregateFunction());


tableEnvironment.executeSql("select user, c_agg(price, num) as 
udf_field from orders group by user").print();






异常堆栈-




default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at