The reason here is Flink doesn't know the hive table is static. After you
create these two tables and
trying to join them, Flink will assume both table will be changing with
time.

Best,
Kurt


On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi!
>
> The problem here is that I dont have a temporal table.
>
> I have a regular stream from kafka (with even time attribute) and a static
> table in hive.
> The Hive table is static, it doesn't change. It doesn't have any time
> attribute, it's not temporal.
>
> Gyula
>
> On Mon, Apr 20, 2020 at 3:43 PM godfrey he <godfre...@gmail.com> wrote:
>
>> Hi Gyual,
>>
>> Can you convert the regular join to lookup join (temporal join) [1],
>> and then you can use window aggregate.
>>
>> >  I understand that the problem is that we cannot join with the Hive
>> table and still maintain the watermark/even time column. But why is this?
>> Regular join can't maintain the time attribute as increasing trend (one
>> record may be joined with a very old record),
>> that means the watermark does not also been guaranteed to increase.
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>>
>> Best,
>> Godfrey
>>
>> Gyula Fóra <gyula.f...@gmail.com> 于2020年4月20日周一 下午4:46写道:
>>
>>> Hi All!
>>>
>>> We hit a the following problem with SQL and trying to understand if
>>> there is a valid workaround.
>>>
>>> We have 2 tables:
>>>
>>> *Kafka*
>>> timestamp (ROWTIME)
>>> item
>>> quantity
>>>
>>> *Hive*
>>> item
>>> price
>>>
>>> So we basically have incoming (ts, id, quantity) and we want to join it
>>> with the hive table to get the total price (price * quantity) got the
>>> current item.
>>>
>>> After this we want to create window aggregate on quantity*price windowed
>>> on timestamp (event time attribute).
>>>
>>> In any way we formulate this query we hit the following error:
>>> org.apache.flink.table.api.TableException: Rowtime attributes must not
>>> be in the input rows of a regular join. As a workaround you can cast the
>>> time attributes of input tables to TIMESTAMP before.
>>>
>>>  I understand that the problem is that we cannot join with the Hive
>>> table and still maintain the watermark/even time column. But why is this?
>>>
>>> In datastream world I would just simply assign Max watermark to my
>>> enrichment input and join outputs will get the ts of the input record. Can
>>> I achieve something similar in SQL/Table api?
>>>
>>> Thank you!
>>> Gyula
>>>
>>>

Reply via email to