我的原SQL:
CREATE TABLE consumer_session_created
(
consumer ROW (consumerUuid STRING),
clientIp STRING,
deviceId STRING,
eventInfo ROW < eventTime BIGINT >,
ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime / 1000, 'yyyy-MM-dd
HH:mm:ss')),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH ( 'connector'='kafka'
,'topic'='local.dwh.paylater.consumer.session.consumer-session-created.v1'
,'properties.bootstrap.servers'='http://localhost:9092' ,'
properties.group.id'='flink-ato-trusted-consumer'
,'scan.startup.mode'='latest-offset'
,'properties.allow.auto.create.topics'='false' ,'format'='avro-confluent'
,'avro-confluent.basic-auth.credentials-source'='null'
,'avro-confluent.basic-auth.user-info'='null'
,'avro-confluent.schema-registry.url'='http://localhost:8081'
,'avro-confluent.schema-registry.subject'='local.dwh.paylater.consumer.session.consumer-session-created.v1')

CREATE
TEMPORARY VIEW consumer_session_created_detail as (
SELECT
csc.consumer.consumerUuid as consumer_consumerUuid,
csc.deviceId as deviceId,
csc.clientIp as clientIp,
csc.eventInfo.eventTime as eventInfo_eventTime
FROM consumer_session_created csc
)

SELECT
consumer_consumerUuid AS entity_id,
COUNT(DISTINCT deviceId) OVER w AS
sp_c_distinct_device_cnt_by_consumer_id_h1_0,
COUNT (DISTINCT clientIp) OVER w AS sp_d_distinct_ip_cnt_by_consumer_id_h1_0
FROM consumer_session_created_detail
WINDOW w AS (
PARTITION BY consumer_consumerUuid
ORDER BY eventInfo_eventTime
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)

报的错:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: org.apache.flink.table.api.ValidationException: SQL
validation failed. From line 9, column 15 to line 9, column 31: Data Type
mismatch between ORDER BY and RANGE clause

at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)

at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)

at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:207)

at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:715)

at aptflinkjobs.stream.SQLStreamer.lambda$execute$1(SQLStreamer.java:149)

at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)

at aptflinkjobs.stream.SQLStreamer.execute(SQLStreamer.java:141)

at aptflinkjobs.stream.SQLStreamer.main(SQLStreamer.java:296)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)

回复