Hi Yan, This is a bug in flink. As a workaround, you can cast eventTime to other basic sql types(for example, cast eventTime as varchar).
@Timo and @Xingcan, I think we have to materialize time indicators in conditions of LogicalFilter. I created an issue and we can have more discussions there[1]. [1] https://issues.apache.org/jira/browse/FLINK-8898 Best, Hequn On Thu, Mar 8, 2018 at 8:59 PM, Timo Walther <twal...@apache.org> wrote: > Hi Xingcan, > > thanks for looking into this. This definitely seems to be a bug. Maybe in > the org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case > we should create an issue for it. > > Regards, > Timo > > > Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]: > > Hi Xingcan, > > > Thanks for your help. Attached is a sample code that can reproduce the > problem. > > When I was writing the sample code, if I remove the `distinct` keyword in > select clause, the AssertionError doesn't occur. > > > *String sql1 = "select distinct id, eventTs, count(*) over (partition by > id order by eventTs rows between 100 preceding and current row) as cnt1 > from myTable";* > > > Best > Yan > ------------------------------ > *From:* xccui-foxmail <xingc...@gmail.com> <xingc...@gmail.com> > *Sent:* Wednesday, March 7, 2018 8:10 PM > *To:* Yan Zhou [FDS Science] > *Cc:* user@flink.apache.org > *Subject:* Re: flink sql timed-window join throw "mismatched type" > AssertionError on rowtime column > > Hi Yan, > > I’d like to look into this. Can you share more about your queries and the > full stack trace? > > Thank, > Xingcan > > On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com> > wrote: > > Hi experts, > I am using flink table api to join two tables, which are datastream > underneath. However, I got an assertion error of "java.lang.AssertionError: > mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details: > > There in only one kafka data source, which is then converted to Table and > registered. One existed column is set as rowtime(event time) attribute. Two > over-window aggregation queries are run against the table and two tables > are created as results. Everything works great so far. > However when timed-window joining two result tables with inherented > rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 > TIMESTAMP(3)" AssertionError. Can someone let me know what is the > possible cause? F.Y.I., I rename the rowtime column for one of the result > table. > > DataStream<MyObject> dataStream = env.addSource(kafkaConsumer); > > Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...); > > tableEnv.registerTable(tableName, table); > > Table left = tableEnv.sqlQuery("select id, *eventTime*,count (*) over > ... from ..."); > > Table right = tableEnv.sqlQuery("select id as r_id, *eventTime as > r_event_time*, count (*) over ... from ..."); > > left.join(right).where("id = r_id && eventTime === r_event_time) > > .addSink(...); // here calcite throw exception: java.lang.AssertionError: > mismatched type $1 TIMESTAMP(3) > > source table > |-- id: Long > |-- eventTime: TimeIndicatorTypeInfo(rowtime) > |-- ... > |-- ... > > result_1 table > |-- id: Long > |-- eventTime: TimeIndicatorTypeInfo(rowtime) > |-- ... > |-- ... > > result_2 table > |-- rid: Long > |-- r_event_time: TimeIndicatorTypeInfo(rowtime) > |-- ... > > > Best > Yan > > > >