Hi Gyual,

Can you convert the regular join to lookup join (temporal join) [1],
and then you can use window aggregate.

>  I understand that the problem is that we cannot join with the Hive table
and still maintain the watermark/even time column. But why is this?
Regular join can't maintain the time attribute as increasing trend (one
record may be joined with a very old record),
that means the watermark does not also been guaranteed to increase.

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Godfrey

Gyula Fóra <gyula.f...@gmail.com> 于2020年4月20日周一 下午4:46写道:

> Hi All!
>
> We hit a the following problem with SQL and trying to understand if there
> is a valid workaround.
>
> We have 2 tables:
>
> *Kafka*
> timestamp (ROWTIME)
> item
> quantity
>
> *Hive*
> item
> price
>
> So we basically have incoming (ts, id, quantity) and we want to join it
> with the hive table to get the total price (price * quantity) got the
> current item.
>
> After this we want to create window aggregate on quantity*price windowed
> on timestamp (event time attribute).
>
> In any way we formulate this query we hit the following error:
> org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.
>
>  I understand that the problem is that we cannot join with the Hive table
> and still maintain the watermark/even time column. But why is this?
>
> In datastream world I would just simply assign Max watermark to my
> enrichment input and join outputs will get the ts of the input record. Can
> I achieve something similar in SQL/Table api?
>
> Thank you!
> Gyula
>
>

Reply via email to