Hi Yan & Timo, this is confirmed to be a bug and I’ve created an issue [1] for it.
I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT keyword will be implemented with an aggregation, which outputs a retract stream [2]. In that situation, all the time-related fields will be materialized as if they were common fields (with the timestamp type). Currently, due to the semantics problem, the time-windowed join cannot be performed on retract streams. But you could try non-windowed join [3] after we fix this. Best, Xingcan [1] https://issues.apache.org/jira/browse/FLINK-8897 <https://issues.apache.org/jira/browse/FLINK-8897> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion> [3] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins> > On 8 Mar 2018, at 8:59 PM, Timo Walther <twal...@apache.org> wrote: > > Hi Xingcan, > > thanks for looking into this. This definitely seems to be a bug. Maybe in the > org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we > should create an issue for it. > > Regards, > Timo > > > Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]: >> Hi Xingcan, >> >> Thanks for your help. Attached is a sample code that can reproduce the >> problem. >> When I was writing the sample code, if I remove the `distinct` keyword in >> select clause, the AssertionError doesn't occur. >> >> String sql1 = "select distinct id, eventTs, count(*) over (partition by id >> order by eventTs rows between 100 preceding and current row) as cnt1 from >> myTable"; >> >> Best >> Yan >> From: xccui-foxmail <xingc...@gmail.com> <mailto:xingc...@gmail.com> >> Sent: Wednesday, March 7, 2018 8:10 PM >> To: Yan Zhou [FDS Science] >> Cc: user@flink.apache.org <mailto:user@flink.apache.org> >> Subject: Re: flink sql timed-window join throw "mismatched type" >> AssertionError on rowtime column >> >> Hi Yan, >> >> I’d like to look into this. Can you share more about your queries and the >> full stack trace? >> >> Thank, >> Xingcan >> >>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com >>> <mailto:yz...@coupang.com>> wrote: >>> >>> Hi experts, >>> I am using flink table api to join two tables, which are datastream >>> underneath. However, I got an assertion error of "java.lang.AssertionError: >>> mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details: >>> >>> There in only one kafka data source, which is then converted to Table and >>> registered. One existed column is set as rowtime(event time) attribute. Two >>> over-window aggregation queries are run against the table and two tables >>> are created as results. Everything works great so far. >>> However when timed-window joining two result tables with inherented >>> rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 >>> TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible >>> cause? F.Y.I., I rename the rowtime column for one of the result table. >>> >>> DataStream<MyObject> dataStream = env.addSource(kafkaConsumer); >>> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...); >>> tableEnv.registerTable(tableName, table); >>> Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ... >>> from ..."); >>> Table right = tableEnv.sqlQuery("select id as r_id, eventTime as >>> r_event_time, count (*) over ... from ..."); >>> left.join(right).where("id = r_id && eventTime === r_event_time) >>> .addSink(...); // here calcite throw exception: java.lang.AssertionError: >>> mismatched type $1 TIMESTAMP(3) >>> >>> source table >>> |-- id: Long >>> |-- eventTime: TimeIndicatorTypeInfo(rowtime) >>> |-- ... >>> |-- ... >>> >>> result_1 table >>> |-- id: Long >>> |-- eventTime: TimeIndicatorTypeInfo(rowtime) >>> |-- ... >>> |-- ... >>> >>> result_2 table >>> |-- rid: Long >>> |-- r_event_time: TimeIndicatorTypeInfo(rowtime) >>> |-- ... >>> >>> >>> Best >>> Yan >> > >