补充一下,sql中dt是timestamp(3)类型,同时是watermark
________________________________
发件人: [email protected] <[email protected]>
发送时间: 2020年11月4日 17:29
收件人: [email protected] <[email protected]>
主题: flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types
Hi,all
本人使用flink版本为1.11.0,自定义udaf如下:
public class GetContinuousListenDuration extends AggregateFunction<Row,
ContinuousListenDuration> {
private static final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
@Override
@DataTypeHint("ROW<startTime TIMESTAMP(3), duration BIGINT>")
public Row getValue(ContinuousListenDuration acc) {
return Row.of(acc.getStartTime(), acc.getDuration());
}
@Override
public ContinuousListenDuration createAccumulator() {
return new ContinuousListenDuration();
}
public void accumulate(ContinuousListenDuration acc,
@DataTypeHint("TIMESTAMP(3)") LocalDateTime dt, Boolean isListening) {
// 此处省略逻辑
}
}
聚合时以Timestamp(3)、Boolean作为参数,getValue返回类型是ROW<startTime TIMESTAMP(3), duration
BIGINT>,函数名定义为get_continuous_listen_duration,调用该函数的sql如下:
insert into
report.result
select
id,
city_code,
get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
).startTime as start_time,
get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
).duration as duration
from
(
select
o.id,
o.dt,
o.order_no,
r.city_code
from
(
select
req [1] as id,
dt,
proctime,
req [2] as order_no
from
tmp_v
where
extra [1] is null
or extra [1] <> 'false'
) o
JOIN dim.right FOR SYSTEM_TIME AS OF o.proctime AS r ON r.id = o.id
) a
group by
id,
city_code
having
get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
).duration >= 2
运行时发生如下异常:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Incompatible
types
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method) ~[?:1.8.0_171]
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
~[?:1.8.0_171]
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:1.8.0_171]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_171]
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateHavingClause(SqlValidatorImpl.java:4214)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3515)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:52)
~[flink-table_2.11-1.11.0.jar:1.11.0]
at
com.ververica.flink.table.gateway.operation.MultiSqlOperation.lambda$executeInternal$0(MultiSqlOperation.java:119)
~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
at
com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:223)
~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
at
com.ververica.flink.table.gateway.operation.MultiSqlOperation.executeInternal(MultiSqlOperation.java:109)
~[flink-sql-gateway-0.1-SNAPSHOT.jar:?]
我的问题是这样定义udf有什么问题吗?