Hi! The problem here is that I dont have a temporal table.
I have a regular stream from kafka (with even time attribute) and a static table in hive. The Hive table is static, it doesn't change. It doesn't have any time attribute, it's not temporal. Gyula On Mon, Apr 20, 2020 at 3:43 PM godfrey he <godfre...@gmail.com> wrote: > Hi Gyual, > > Can you convert the regular join to lookup join (temporal join) [1], > and then you can use window aggregate. > > > I understand that the problem is that we cannot join with the Hive > table and still maintain the watermark/even time column. But why is this? > Regular join can't maintain the time attribute as increasing trend (one > record may be joined with a very old record), > that means the watermark does not also been guaranteed to increase. > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table > > Best, > Godfrey > > Gyula Fóra <gyula.f...@gmail.com> 于2020年4月20日周一 下午4:46写道: > >> Hi All! >> >> We hit a the following problem with SQL and trying to understand if there >> is a valid workaround. >> >> We have 2 tables: >> >> *Kafka* >> timestamp (ROWTIME) >> item >> quantity >> >> *Hive* >> item >> price >> >> So we basically have incoming (ts, id, quantity) and we want to join it >> with the hive table to get the total price (price * quantity) got the >> current item. >> >> After this we want to create window aggregate on quantity*price windowed >> on timestamp (event time attribute). >> >> In any way we formulate this query we hit the following error: >> org.apache.flink.table.api.TableException: Rowtime attributes must not be >> in the input rows of a regular join. As a workaround you can cast the time >> attributes of input tables to TIMESTAMP before. >> >> I understand that the problem is that we cannot join with the Hive table >> and still maintain the watermark/even time column. But why is this? >> >> In datastream world I would just simply assign Max watermark to my >> enrichment input and join outputs will get the ts of the input record. Can >> I achieve something similar in SQL/Table api? >> >> Thank you! >> Gyula >> >>