Hi Gyual, Can you convert the regular join to lookup join (temporal join) [1], and then you can use window aggregate.
> I understand that the problem is that we cannot join with the Hive table and still maintain the watermark/even time column. But why is this? Regular join can't maintain the time attribute as increasing trend (one record may be joined with a very old record), that means the watermark does not also been guaranteed to increase. https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table Best, Godfrey Gyula Fóra <gyula.f...@gmail.com> 于2020年4月20日周一 下午4:46写道: > Hi All! > > We hit a the following problem with SQL and trying to understand if there > is a valid workaround. > > We have 2 tables: > > *Kafka* > timestamp (ROWTIME) > item > quantity > > *Hive* > item > price > > So we basically have incoming (ts, id, quantity) and we want to join it > with the hive table to get the total price (price * quantity) got the > current item. > > After this we want to create window aggregate on quantity*price windowed > on timestamp (event time attribute). > > In any way we formulate this query we hit the following error: > org.apache.flink.table.api.TableException: Rowtime attributes must not be > in the input rows of a regular join. As a workaround you can cast the time > attributes of input tables to TIMESTAMP before. > > I understand that the problem is that we cannot join with the Hive table > and still maintain the watermark/even time column. But why is this? > > In datastream world I would just simply assign Max watermark to my > enrichment input and join outputs will get the ts of the input record. Can > I achieve something similar in SQL/Table api? > > Thank you! > Gyula > >