Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Thanks Timo,
I can see why this is pretty complicated to solve nicely at the moment (and
in general).
We will work around this for now, and looking forward to help make this
better in the future!

Gyula


On Mon, Apr 20, 2020 at 4:37 PM Timo Walther  wrote:

> Hi Gyula,
>
> first of all the exception
>
> ```
> org.apache.flink.table.api.TableException: Rowtime attributes must not
> be in the input rows of a regular join. As a workaround you can cast the
> time attributes of input tables to TIMESTAMP before.
> ```
>
> is IMHO one of the biggest shortcomings that we currently have in the
> planners due to internals around time interval joins [0]. But this is a
> different topic.
>
> I think in theory Gyula is right, however, we would need to store the
> static table somewhere in order to perform lookups while the stream is
> passing by. And while checking the time attributes we would need to know
> which table is bounded and what kind of changes are coming into the
> streaming table.
>
> There is still a lot of work in the future to make the concepts smoother.
>
> Regards,
> Timo
>
>
> [0] https://issues.apache.org/jira/browse/FLINK-10211
>
>
>
>
>
> On 20.04.20 16:09, Gyula Fóra wrote:
> > The HiveTableSource (and many others) return isBounded() -> true.
> > In this case it is not even possible for it to change over time, so I am
> > a bit confused.
> >
> > To me it sounds like you should always be able to join a stream against
> > a bounded table, temporal or not it is pretty well defined.
> > Maybe there is some fundamental concept that I dont understand, I don't
> > have much experience with this to be fair.
> >
> > Gyula
> >
> > On Mon, Apr 20, 2020 at 4:03 PM Kurt Young  > > wrote:
> >
> > The reason here is Flink doesn't know the hive table is static.
> > After you create these two tables and
> > trying to join them, Flink will assume both table will be changing
> > with time.
> >
> > Best,
> > Kurt
> >
> >
> > On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  > > wrote:
> >
> > Hi!
> >
> > The problem here is that I dont have a temporal table.
> >
> > I have a regular stream from kafka (with even time attribute)
> > and a static table in hive.
> > The Hive table is static, it doesn't change. It doesn't have any
> > time attribute, it's not temporal.
> >
> > Gyula
> >
> > On Mon, Apr 20, 2020 at 3:43 PM godfrey he  > > wrote:
> >
> > Hi Gyual,
> >
> > Can you convert the regular join to lookup join (temporal
> > join) [1],
> > and then you can use window aggregate.
> >
> >  >  I understand that the problem is that we cannot join
> > with the Hive table and still maintain the watermark/even
> > time column. But why is this?
> > Regular join can't maintain the time attribute as increasing
> > trend (one record may be joined with a very old record),
> > that means the watermark does not also been guaranteed to
> > increase.
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
> >
> > Best,
> > Godfrey
> >
> > Gyula Fóra  > > 于2020年4月20日周一 下午4:46
> > 写道:
> >
> > Hi All!
> >
> > We hit a the following problem with SQL and trying to
> > understand if there is a valid workaround.
> >
> > We have 2 tables:
> >
> > _Kafka_
> > timestamp (ROWTIME)
> > item
> > quantity
> >
> > _Hive_
> > item
> > price
> >
> > So we basically have incoming (ts, id, quantity) and we
> > want to join it with the hive table to get the total
> > price (price * quantity) got the current item.
> >
> > After this we want to create window aggregate on
> > quantity*price windowed on timestamp (event time
> attribute).
> >
> > In any way we formulate this query we hit the following
> > error:
> > org.apache.flink.table.api.TableException: Rowtime
> > attributes must not be in the input rows of a regular
> > join. As a workaround you can cast the time attributes
> > of input tables to TIMESTAMP before.
> >
> >   I understand that the problem is that we cannot join
> > with the Hive table and still maintain the
> > watermark/even time column. But why is this?
> >
> > In datastream world I would just simply assign Max
> > watermark to my enrichment 

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Timo Walther

Hi Gyula,

first of all the exception

```
org.apache.flink.table.api.TableException: Rowtime attributes must not 
be in the input rows of a regular join. As a workaround you can cast the 
time attributes of input tables to TIMESTAMP before.

```

is IMHO one of the biggest shortcomings that we currently have in the 
planners due to internals around time interval joins [0]. But this is a 
different topic.


I think in theory Gyula is right, however, we would need to store the 
static table somewhere in order to perform lookups while the stream is 
passing by. And while checking the time attributes we would need to know 
which table is bounded and what kind of changes are coming into the 
streaming table.


There is still a lot of work in the future to make the concepts smoother.

Regards,
Timo


[0] https://issues.apache.org/jira/browse/FLINK-10211





On 20.04.20 16:09, Gyula Fóra wrote:

The HiveTableSource (and many others) return isBounded() -> true.
In this case it is not even possible for it to change over time, so I am 
a bit confused.


To me it sounds like you should always be able to join a stream against 
a bounded table, temporal or not it is pretty well defined.
Maybe there is some fundamental concept that I dont understand, I don't 
have much experience with this to be fair.


Gyula

On Mon, Apr 20, 2020 at 4:03 PM Kurt Young > wrote:


The reason here is Flink doesn't know the hive table is static.
After you create these two tables and
trying to join them, Flink will assume both table will be changing
with time.

Best,
Kurt


On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra mailto:gyula.f...@gmail.com>> wrote:

Hi!

The problem here is that I dont have a temporal table.

I have a regular stream from kafka (with even time attribute)
and a static table in hive.
The Hive table is static, it doesn't change. It doesn't have any
time attribute, it's not temporal.

Gyula

On Mon, Apr 20, 2020 at 3:43 PM godfrey he mailto:godfre...@gmail.com>> wrote:

Hi Gyual,

Can you convert the regular join to lookup join (temporal
join) [1],
and then you can use window aggregate.

 >  I understand that the problem is that we cannot join
with the Hive table and still maintain the watermark/even
time column. But why is this?
Regular join can't maintain the time attribute as increasing
trend (one record may be joined with a very old record),
that means the watermark does not also been guaranteed to
increase.


https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Godfrey

Gyula Fóra mailto:gyula.f...@gmail.com>> 于2020年4月20日周一 下午4:46
写道:

Hi All!

We hit a the following problem with SQL and trying to
understand if there is a valid workaround.

We have 2 tables:

_Kafka_
timestamp (ROWTIME)
item
quantity

_Hive_
item
price

So we basically have incoming (ts, id, quantity) and we
want to join it with the hive table to get the total
price (price * quantity) got the current item.

After this we want to create window aggregate on
quantity*price windowed on timestamp (event time attribute).

In any way we formulate this query we hit the following
error:
org.apache.flink.table.api.TableException: Rowtime
attributes must not be in the input rows of a regular
join. As a workaround you can cast the time attributes
of input tables to TIMESTAMP before.

  I understand that the problem is that we cannot join
with the Hive table and still maintain the
watermark/even time column. But why is this?

In datastream world I would just simply assign Max
watermark to my enrichment input and join outputs will
get the ts of the input record. Can I achieve something
similar in SQL/Table api?

Thank you!
Gyula





Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Thanks for the clarification, we can live with this restriction I
just wanted to make sure that I fully understand why we are getting
these errors and if there is any reasonable workaround.

Thanks again :)
Gyula

On Mon, Apr 20, 2020 at 4:21 PM Kurt Young  wrote:

> According to the current implementation, yes you are right hive table
> source will always be bounded.
> But conceptually, we can't do this assumption. For example, we
> might further improve hive table source
> to also support unbounded cases, .e.g. monitoring hive tables and always
> read newly appeared data.
> So right now, Flink relies on the "global flag" to distinguish whether the
> table should be treated as static
> or dynamically changing.
>
> The "global flag" is whether you are using `BatchTableEnvironment` or
> `StreamTableEnvironment` (old versions)
> and EnvironmentSettings's batchMode or streamingMode (newer versions).
>
> But we should admit that Flink hasn't finish the unification work. Your
> case will also be considered in the
> future when we want to further unify and simplify these concepts and
> usages.
>
> Best,
> Kurt
>
>
> On Mon, Apr 20, 2020 at 10:09 PM Gyula Fóra  wrote:
>
>> The HiveTableSource (and many others) return isBounded() -> true.
>> In this case it is not even possible for it to change over time, so I am
>> a bit confused.
>>
>> To me it sounds like you should always be able to join a stream against a
>> bounded table, temporal or not it is pretty well defined.
>> Maybe there is some fundamental concept that I dont understand, I don't
>> have much experience with this to be fair.
>>
>> Gyula
>>
>> On Mon, Apr 20, 2020 at 4:03 PM Kurt Young  wrote:
>>
>>> The reason here is Flink doesn't know the hive table is static. After
>>> you create these two tables and
>>> trying to join them, Flink will assume both table will be changing with
>>> time.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  wrote:
>>>
 Hi!

 The problem here is that I dont have a temporal table.

 I have a regular stream from kafka (with even time attribute) and a
 static table in hive.
 The Hive table is static, it doesn't change. It doesn't have any time
 attribute, it's not temporal.

 Gyula

 On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:

> Hi Gyual,
>
> Can you convert the regular join to lookup join (temporal join) [1],
> and then you can use window aggregate.
>
> >  I understand that the problem is that we cannot join with the Hive
> table and still maintain the watermark/even time column. But why is this?
> Regular join can't maintain the time attribute as increasing trend
> (one record may be joined with a very old record),
> that means the watermark does not also been guaranteed to increase.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>
> Best,
> Godfrey
>
> Gyula Fóra  于2020年4月20日周一 下午4:46写道:
>
>> Hi All!
>>
>> We hit a the following problem with SQL and trying to understand if
>> there is a valid workaround.
>>
>> We have 2 tables:
>>
>> *Kafka*
>> timestamp (ROWTIME)
>> item
>> quantity
>>
>> *Hive*
>> item
>> price
>>
>> So we basically have incoming (ts, id, quantity) and we want to join
>> it with the hive table to get the total price (price * quantity) got the
>> current item.
>>
>> After this we want to create window aggregate on quantity*price
>> windowed on timestamp (event time attribute).
>>
>> In any way we formulate this query we hit the following error:
>> org.apache.flink.table.api.TableException: Rowtime attributes must
>> not be in the input rows of a regular join. As a workaround you can cast
>> the time attributes of input tables to TIMESTAMP before.
>>
>>  I understand that the problem is that we cannot join with the Hive
>> table and still maintain the watermark/even time column. But why is this?
>>
>> In datastream world I would just simply assign Max watermark to my
>> enrichment input and join outputs will get the ts of the input record. 
>> Can
>> I achieve something similar in SQL/Table api?
>>
>> Thank you!
>> Gyula
>>
>>


Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Kurt Young
According to the current implementation, yes you are right hive table
source will always be bounded.
But conceptually, we can't do this assumption. For example, we
might further improve hive table source
to also support unbounded cases, .e.g. monitoring hive tables and always
read newly appeared data.
So right now, Flink relies on the "global flag" to distinguish whether the
table should be treated as static
or dynamically changing.

The "global flag" is whether you are using `BatchTableEnvironment` or
`StreamTableEnvironment` (old versions)
and EnvironmentSettings's batchMode or streamingMode (newer versions).

But we should admit that Flink hasn't finish the unification work. Your
case will also be considered in the
future when we want to further unify and simplify these concepts and
usages.

Best,
Kurt


On Mon, Apr 20, 2020 at 10:09 PM Gyula Fóra  wrote:

> The HiveTableSource (and many others) return isBounded() -> true.
> In this case it is not even possible for it to change over time, so I am a
> bit confused.
>
> To me it sounds like you should always be able to join a stream against a
> bounded table, temporal or not it is pretty well defined.
> Maybe there is some fundamental concept that I dont understand, I don't
> have much experience with this to be fair.
>
> Gyula
>
> On Mon, Apr 20, 2020 at 4:03 PM Kurt Young  wrote:
>
>> The reason here is Flink doesn't know the hive table is static. After you
>> create these two tables and
>> trying to join them, Flink will assume both table will be changing with
>> time.
>>
>> Best,
>> Kurt
>>
>>
>> On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> The problem here is that I dont have a temporal table.
>>>
>>> I have a regular stream from kafka (with even time attribute) and a
>>> static table in hive.
>>> The Hive table is static, it doesn't change. It doesn't have any time
>>> attribute, it's not temporal.
>>>
>>> Gyula
>>>
>>> On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:
>>>
 Hi Gyual,

 Can you convert the regular join to lookup join (temporal join) [1],
 and then you can use window aggregate.

 >  I understand that the problem is that we cannot join with the Hive
 table and still maintain the watermark/even time column. But why is this?
 Regular join can't maintain the time attribute as increasing trend (one
 record may be joined with a very old record),
 that means the watermark does not also been guaranteed to increase.


 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

 Best,
 Godfrey

 Gyula Fóra  于2020年4月20日周一 下午4:46写道:

> Hi All!
>
> We hit a the following problem with SQL and trying to understand if
> there is a valid workaround.
>
> We have 2 tables:
>
> *Kafka*
> timestamp (ROWTIME)
> item
> quantity
>
> *Hive*
> item
> price
>
> So we basically have incoming (ts, id, quantity) and we want to join
> it with the hive table to get the total price (price * quantity) got the
> current item.
>
> After this we want to create window aggregate on quantity*price
> windowed on timestamp (event time attribute).
>
> In any way we formulate this query we hit the following error:
> org.apache.flink.table.api.TableException: Rowtime attributes must not
> be in the input rows of a regular join. As a workaround you can cast the
> time attributes of input tables to TIMESTAMP before.
>
>  I understand that the problem is that we cannot join with the Hive
> table and still maintain the watermark/even time column. But why is this?
>
> In datastream world I would just simply assign Max watermark to my
> enrichment input and join outputs will get the ts of the input record. Can
> I achieve something similar in SQL/Table api?
>
> Thank you!
> Gyula
>
>


Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
The HiveTableSource (and many others) return isBounded() -> true.
In this case it is not even possible for it to change over time, so I am a
bit confused.

To me it sounds like you should always be able to join a stream against a
bounded table, temporal or not it is pretty well defined.
Maybe there is some fundamental concept that I dont understand, I don't
have much experience with this to be fair.

Gyula

On Mon, Apr 20, 2020 at 4:03 PM Kurt Young  wrote:

> The reason here is Flink doesn't know the hive table is static. After you
> create these two tables and
> trying to join them, Flink will assume both table will be changing with
> time.
>
> Best,
> Kurt
>
>
> On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  wrote:
>
>> Hi!
>>
>> The problem here is that I dont have a temporal table.
>>
>> I have a regular stream from kafka (with even time attribute) and a
>> static table in hive.
>> The Hive table is static, it doesn't change. It doesn't have any time
>> attribute, it's not temporal.
>>
>> Gyula
>>
>> On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:
>>
>>> Hi Gyual,
>>>
>>> Can you convert the regular join to lookup join (temporal join) [1],
>>> and then you can use window aggregate.
>>>
>>> >  I understand that the problem is that we cannot join with the Hive
>>> table and still maintain the watermark/even time column. But why is this?
>>> Regular join can't maintain the time attribute as increasing trend (one
>>> record may be joined with a very old record),
>>> that means the watermark does not also been guaranteed to increase.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>>>
>>> Best,
>>> Godfrey
>>>
>>> Gyula Fóra  于2020年4月20日周一 下午4:46写道:
>>>
 Hi All!

 We hit a the following problem with SQL and trying to understand if
 there is a valid workaround.

 We have 2 tables:

 *Kafka*
 timestamp (ROWTIME)
 item
 quantity

 *Hive*
 item
 price

 So we basically have incoming (ts, id, quantity) and we want to join it
 with the hive table to get the total price (price * quantity) got the
 current item.

 After this we want to create window aggregate on quantity*price
 windowed on timestamp (event time attribute).

 In any way we formulate this query we hit the following error:
 org.apache.flink.table.api.TableException: Rowtime attributes must not
 be in the input rows of a regular join. As a workaround you can cast the
 time attributes of input tables to TIMESTAMP before.

  I understand that the problem is that we cannot join with the Hive
 table and still maintain the watermark/even time column. But why is this?

 In datastream world I would just simply assign Max watermark to my
 enrichment input and join outputs will get the ts of the input record. Can
 I achieve something similar in SQL/Table api?

 Thank you!
 Gyula




Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Kurt Young
The reason here is Flink doesn't know the hive table is static. After you
create these two tables and
trying to join them, Flink will assume both table will be changing with
time.

Best,
Kurt


On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  wrote:

> Hi!
>
> The problem here is that I dont have a temporal table.
>
> I have a regular stream from kafka (with even time attribute) and a static
> table in hive.
> The Hive table is static, it doesn't change. It doesn't have any time
> attribute, it's not temporal.
>
> Gyula
>
> On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:
>
>> Hi Gyual,
>>
>> Can you convert the regular join to lookup join (temporal join) [1],
>> and then you can use window aggregate.
>>
>> >  I understand that the problem is that we cannot join with the Hive
>> table and still maintain the watermark/even time column. But why is this?
>> Regular join can't maintain the time attribute as increasing trend (one
>> record may be joined with a very old record),
>> that means the watermark does not also been guaranteed to increase.
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>>
>> Best,
>> Godfrey
>>
>> Gyula Fóra  于2020年4月20日周一 下午4:46写道:
>>
>>> Hi All!
>>>
>>> We hit a the following problem with SQL and trying to understand if
>>> there is a valid workaround.
>>>
>>> We have 2 tables:
>>>
>>> *Kafka*
>>> timestamp (ROWTIME)
>>> item
>>> quantity
>>>
>>> *Hive*
>>> item
>>> price
>>>
>>> So we basically have incoming (ts, id, quantity) and we want to join it
>>> with the hive table to get the total price (price * quantity) got the
>>> current item.
>>>
>>> After this we want to create window aggregate on quantity*price windowed
>>> on timestamp (event time attribute).
>>>
>>> In any way we formulate this query we hit the following error:
>>> org.apache.flink.table.api.TableException: Rowtime attributes must not
>>> be in the input rows of a regular join. As a workaround you can cast the
>>> time attributes of input tables to TIMESTAMP before.
>>>
>>>  I understand that the problem is that we cannot join with the Hive
>>> table and still maintain the watermark/even time column. But why is this?
>>>
>>> In datastream world I would just simply assign Max watermark to my
>>> enrichment input and join outputs will get the ts of the input record. Can
>>> I achieve something similar in SQL/Table api?
>>>
>>> Thank you!
>>> Gyula
>>>
>>>


Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Hi!

The problem here is that I dont have a temporal table.

I have a regular stream from kafka (with even time attribute) and a static
table in hive.
The Hive table is static, it doesn't change. It doesn't have any time
attribute, it's not temporal.

Gyula

On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:

> Hi Gyual,
>
> Can you convert the regular join to lookup join (temporal join) [1],
> and then you can use window aggregate.
>
> >  I understand that the problem is that we cannot join with the Hive
> table and still maintain the watermark/even time column. But why is this?
> Regular join can't maintain the time attribute as increasing trend (one
> record may be joined with a very old record),
> that means the watermark does not also been guaranteed to increase.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>
> Best,
> Godfrey
>
> Gyula Fóra  于2020年4月20日周一 下午4:46写道:
>
>> Hi All!
>>
>> We hit a the following problem with SQL and trying to understand if there
>> is a valid workaround.
>>
>> We have 2 tables:
>>
>> *Kafka*
>> timestamp (ROWTIME)
>> item
>> quantity
>>
>> *Hive*
>> item
>> price
>>
>> So we basically have incoming (ts, id, quantity) and we want to join it
>> with the hive table to get the total price (price * quantity) got the
>> current item.
>>
>> After this we want to create window aggregate on quantity*price windowed
>> on timestamp (event time attribute).
>>
>> In any way we formulate this query we hit the following error:
>> org.apache.flink.table.api.TableException: Rowtime attributes must not be
>> in the input rows of a regular join. As a workaround you can cast the time
>> attributes of input tables to TIMESTAMP before.
>>
>>  I understand that the problem is that we cannot join with the Hive table
>> and still maintain the watermark/even time column. But why is this?
>>
>> In datastream world I would just simply assign Max watermark to my
>> enrichment input and join outputs will get the ts of the input record. Can
>> I achieve something similar in SQL/Table api?
>>
>> Thank you!
>> Gyula
>>
>>


Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread godfrey he
Hi Gyual,

Can you convert the regular join to lookup join (temporal join) [1],
and then you can use window aggregate.

>  I understand that the problem is that we cannot join with the Hive table
and still maintain the watermark/even time column. But why is this?
Regular join can't maintain the time attribute as increasing trend (one
record may be joined with a very old record),
that means the watermark does not also been guaranteed to increase.

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table

Best,
Godfrey

Gyula Fóra  于2020年4月20日周一 下午4:46写道:

> Hi All!
>
> We hit a the following problem with SQL and trying to understand if there
> is a valid workaround.
>
> We have 2 tables:
>
> *Kafka*
> timestamp (ROWTIME)
> item
> quantity
>
> *Hive*
> item
> price
>
> So we basically have incoming (ts, id, quantity) and we want to join it
> with the hive table to get the total price (price * quantity) got the
> current item.
>
> After this we want to create window aggregate on quantity*price windowed
> on timestamp (event time attribute).
>
> In any way we formulate this query we hit the following error:
> org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.
>
>  I understand that the problem is that we cannot join with the Hive table
> and still maintain the watermark/even time column. But why is this?
>
> In datastream world I would just simply assign Max watermark to my
> enrichment input and join outputs will get the ts of the input record. Can
> I achieve something similar in SQL/Table api?
>
> Thank you!
> Gyula
>
>


Joining table with row attribute against an enrichment table

2020-04-20 Thread Gyula Fóra
Hi All!

We hit a the following problem with SQL and trying to understand if there
is a valid workaround.

We have 2 tables:

*Kafka*
timestamp (ROWTIME)
item
quantity

*Hive*
item
price

So we basically have incoming (ts, id, quantity) and we want to join it
with the hive table to get the total price (price * quantity) got the
current item.

After this we want to create window aggregate on quantity*price windowed on
timestamp (event time attribute).

In any way we formulate this query we hit the following error:
org.apache.flink.table.api.TableException: Rowtime attributes must not be
in the input rows of a regular join. As a workaround you can cast the time
attributes of input tables to TIMESTAMP before.

 I understand that the problem is that we cannot join with the Hive table
and still maintain the watermark/even time column. But why is this?

In datastream world I would just simply assign Max watermark to my
enrichment input and join outputs will get the ts of the input record. Can
I achieve something similar in SQL/Table api?

Thank you!
Gyula