Hi Yan & Timo,

this is confirmed to be a bug and I’ve created an issue [1] for it.

I’ll explain more about this query. In Flink SQL/Table API, the DISTINCT 
keyword will be implemented with an aggregation, which outputs a retract stream 
[2]. In that situation, all the time-related fields will be materialized as if 
they were common fields (with the timestamp type). Currently, due to the 
semantics problem, the time-windowed join cannot be performed on retract 
streams. But you could try non-windowed join [3] after we fix this.

Best,
Xingcan

[1] https://issues.apache.org/jira/browse/FLINK-8897 
<https://issues.apache.org/jira/browse/FLINK-8897>
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#table-to-stream-conversion>
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins>

> On 8 Mar 2018, at 8:59 PM, Timo Walther <twal...@apache.org> wrote:
> 
> Hi Xingcan,
> 
> thanks for looking into this. This definitely seems to be a bug. Maybe in the 
> org.apache.flink.table.calcite.RelTimeIndicatorConverter. In any case we 
> should create an issue for it.
> 
> Regards,
> Timo
> 
> 
> Am 3/8/18 um 7:27 AM schrieb Yan Zhou [FDS Science]:
>> Hi Xingcan,
>> 
>> Thanks for your help. Attached is a sample code that can reproduce the 
>> problem.
>> When I was writing the sample code, if I remove the `distinct` keyword in 
>> select clause, the AssertionError doesn't occur.
>> 
>> String sql1 = "select distinct id, eventTs, count(*) over (partition by id 
>> order by eventTs rows between 100 preceding and current row) as cnt1 from 
>> myTable";
>> 
>> Best
>> Yan
>> From: xccui-foxmail <xingc...@gmail.com> <mailto:xingc...@gmail.com>
>> Sent: Wednesday, March 7, 2018 8:10 PM
>> To: Yan Zhou [FDS Science]
>> Cc: user@flink.apache.org <mailto:user@flink.apache.org>
>> Subject: Re: flink sql timed-window join throw "mismatched type" 
>> AssertionError on rowtime column
>>  
>> Hi Yan,
>> 
>> I’d like to look into this. Can you share more about your queries and the 
>> full stack trace?
>> 
>> Thank,
>> Xingcan
>> 
>>> On 8 Mar 2018, at 11:28 AM, Yan Zhou [FDS Science] <yz...@coupang.com 
>>> <mailto:yz...@coupang.com>> wrote:
>>> 
>>> Hi experts, 
>>> I am using flink table api to join two tables, which are datastream 
>>> underneath. However, I got an assertion error of "java.lang.AssertionError: 
>>> mismatched type $1 TIMESTAMP(3)" on rowtime column. Below is more details:
>>> 
>>> There in only one kafka data source, which is then converted to Table and 
>>> registered. One existed column is set as rowtime(event time) attribute. Two 
>>> over-window aggregation queries are run against the table and two tables 
>>> are created as results. Everything works great so far.
>>> However when timed-window joining two result tables with inherented 
>>> rowtime, calcite throw the "java.lang.AssertionError: mismatched type $1 
>>> TIMESTAMP(3)" AssertionError. Can someone let me know what is the possible 
>>> cause? F.Y.I., I rename the rowtime column for one of the result table.  
>>> 
>>> DataStream<MyObject> dataStream = env.addSource(kafkaConsumer);
>>> Table table = tableEnv.fromDataStream(dataStream, "col1", "col2", ...);
>>> tableEnv.registerTable(tableName, table);
>>> Table left = tableEnv.sqlQuery("select id, eventTime,count (*) over ...  
>>> from ...");
>>> Table right = tableEnv.sqlQuery("select id as r_id, eventTime as 
>>> r_event_time, count (*) over ...  from ...");
>>> left.join(right).where("id = r_id && eventTime === r_event_time)
>>> .addSink(...); // here calcite throw exception: java.lang.AssertionError: 
>>> mismatched type $1 TIMESTAMP(3) 
>>> 
>>> source table
>>>  |-- id: Long
>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>>  |-- ...
>>>  
>>> result_1 table
>>>  |-- id: Long
>>>  |-- eventTime: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>>  |-- ...
>>>  
>>> result_2 table
>>>  |-- rid: Long
>>>  |-- r_event_time: TimeIndicatorTypeInfo(rowtime)
>>>  |-- ...
>>> 
>>> 
>>> Best
>>> Yan
>> 
> 
> 

Reply via email to