Thanks for the clarification, we can live with this restriction I
just wanted to make sure that I fully understand why we are getting
these errors and if there is any reasonable workaround.

Thanks again :)
Gyula

On Mon, Apr 20, 2020 at 4:21 PM Kurt Young <ykt...@gmail.com> wrote:

> According to the current implementation, yes you are right hive table
> source will always be bounded.
> But conceptually, we can't do this assumption. For example, we
> might further improve hive table source
> to also support unbounded cases, .e.g. monitoring hive tables and always
> read newly appeared data.
> So right now, Flink relies on the "global flag" to distinguish whether the
> table should be treated as static
> or dynamically changing.
>
> The "global flag" is whether you are using `BatchTableEnvironment` or
> `StreamTableEnvironment` (old versions)
> and EnvironmentSettings's batchMode or streamingMode (newer versions).
>
> But we should admit that Flink hasn't finish the unification work. Your
> case will also be considered in the
> future when we want to further unify and simplify these concepts and
> usages.
>
> Best,
> Kurt
>
>
> On Mon, Apr 20, 2020 at 10:09 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> The HiveTableSource (and many others) return isBounded() -> true.
>> In this case it is not even possible for it to change over time, so I am
>> a bit confused.
>>
>> To me it sounds like you should always be able to join a stream against a
>> bounded table, temporal or not it is pretty well defined.
>> Maybe there is some fundamental concept that I dont understand, I don't
>> have much experience with this to be fair.
>>
>> Gyula
>>
>> On Mon, Apr 20, 2020 at 4:03 PM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> The reason here is Flink doesn't know the hive table is static. After
>>> you create these two tables and
>>> trying to join them, Flink will assume both table will be changing with
>>> time.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> The problem here is that I dont have a temporal table.
>>>>
>>>> I have a regular stream from kafka (with even time attribute) and a
>>>> static table in hive.
>>>> The Hive table is static, it doesn't change. It doesn't have any time
>>>> attribute, it's not temporal.
>>>>
>>>> Gyula
>>>>
>>>> On Mon, Apr 20, 2020 at 3:43 PM godfrey he <godfre...@gmail.com> wrote:
>>>>
>>>>> Hi Gyual,
>>>>>
>>>>> Can you convert the regular join to lookup join (temporal join) [1],
>>>>> and then you can use window aggregate.
>>>>>
>>>>> >  I understand that the problem is that we cannot join with the Hive
>>>>> table and still maintain the watermark/even time column. But why is this?
>>>>> Regular join can't maintain the time attribute as increasing trend
>>>>> (one record may be joined with a very old record),
>>>>> that means the watermark does not also been guaranteed to increase.
>>>>>
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>>>>>
>>>>> Best,
>>>>> Godfrey
>>>>>
>>>>> Gyula Fóra <gyula.f...@gmail.com> 于2020年4月20日周一 下午4:46写道:
>>>>>
>>>>>> Hi All!
>>>>>>
>>>>>> We hit a the following problem with SQL and trying to understand if
>>>>>> there is a valid workaround.
>>>>>>
>>>>>> We have 2 tables:
>>>>>>
>>>>>> *Kafka*
>>>>>> timestamp (ROWTIME)
>>>>>> item
>>>>>> quantity
>>>>>>
>>>>>> *Hive*
>>>>>> item
>>>>>> price
>>>>>>
>>>>>> So we basically have incoming (ts, id, quantity) and we want to join
>>>>>> it with the hive table to get the total price (price * quantity) got the
>>>>>> current item.
>>>>>>
>>>>>> After this we want to create window aggregate on quantity*price
>>>>>> windowed on timestamp (event time attribute).
>>>>>>
>>>>>> In any way we formulate this query we hit the following error:
>>>>>> org.apache.flink.table.api.TableException: Rowtime attributes must
>>>>>> not be in the input rows of a regular join. As a workaround you can cast
>>>>>> the time attributes of input tables to TIMESTAMP before.
>>>>>>
>>>>>>  I understand that the problem is that we cannot join with the Hive
>>>>>> table and still maintain the watermark/even time column. But why is this?
>>>>>>
>>>>>> In datastream world I would just simply assign Max watermark to my
>>>>>> enrichment input and join outputs will get the ts of the input record. 
>>>>>> Can
>>>>>> I achieve something similar in SQL/Table api?
>>>>>>
>>>>>> Thank you!
>>>>>> Gyula
>>>>>>
>>>>>>

Reply via email to