Re: LookupableTableSource question

2019-07-02 Thread JingsongLee
> how do I enable Blink planner support? 
After flink-1.9 release, you can try Blink-planner.

>Since when is LATERAL TABLE available in Flink? Is it equivalent to using 
>temporal tables?
LATERAL TABLE is table function in table, it is available in Flink for a long 
time.[1]
It is different from temporal table.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/udfs.html#table-functions

Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年7月1日(星期一) 21:26
To:JingsongLee 
Cc:user 
Subject:Re: LookupableTableSource question

I probably messed up with the meaning of eval()..thus it is called once for 
every distinct key (that could be composed by a combination of fields)?
So, the other question is..how do I enable Blink planner support? 
Since when is LATERAL TABLE available in Flink? Is it equivalent to using 
temporal tables [1]?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html

Best,
Flavio
On Sat, Jun 29, 2019 at 3:16 AM JingsongLee  wrote:
The keys means joint primary keys, it is not list of keys, in your case, maybe 
there is a single key?

Best, Jingsong Lee


来自阿里邮箱 iPhone版
 --Original Mail --
From:Flavio Pompermaier 
Date:2019-06-28 22:53:31
Recipient:JingsongLee 
CC:user 
Subject:Re: LookupableTableSource question
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...
On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier  wrote:
This could be a good fit, I'll try to dig into it and see if it can be adapted 
to a REST service.
The only strange thing I see is that the key of the local cache is per block of 
keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

while I'd use the following (also for JDBC):

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee  wrote:
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. 
Or use
 blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?

[1] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
[2] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
[3] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75

 Best, JingsongLee

--
From:Flavio Pompermaier 
Send Time:2019年6月28日(星期五) 21:04
To:user 
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated 
lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either 
when a key was not found (a new key has probably been added in the mean time) 
or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very 
similar to what I'd like to achieve but I can't find a real-world example using 
it and it lacks of such 2 

Re: LookupableTableSource question

2019-07-01 Thread Flavio Pompermaier
I probably messed up with the meaning of eval()..thus it is called once for
every distinct key (that could be composed by a combination of fields)?
So, the other question is..how do I enable Blink planner support?
Since when is LATERAL TABLE available in Flink? Is it equivalent to using
temporal tables [1]?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html

Best,
Flavio

On Sat, Jun 29, 2019 at 3:16 AM JingsongLee  wrote:

> The keys means joint primary keys, it is not list of keys, in your case,
> maybe there is a single key?
>
> Best, Jingsong Lee
>
>
> 来自阿里邮箱 iPhone版
> <https://itunes.apple.com/us/app/a-li-yun-you/id923828102?l=zh=1=8>
>
> --Original Mail --
> *From:*Flavio Pompermaier 
> *Date:*2019-06-28 22:53:31
> *Recipient:*JingsongLee 
> *CC:*user 
> *Subject:*Re: LookupableTableSource question
> Sorry I copied and pasted twice the current eval method...I'd do this:
>
> public void eval(Object... keys) {
> for (Object kkk : keys) {
> Row keyRow = Row.of(kkk);
> if (cache != null) {
> List cachedRows = cache.getIfPresent(keyRow);
> if (cachedRows != null) {
> for (Row cachedRow : cachedRows) {
> collect(cachedRow);
> }
> return;
> }
> }
> }
>  ...
>
> On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier 
> wrote:
>
>> This could be a good fit, I'll try to dig into it and see if it can be
>> adapted to a REST service.
>> The only strange thing I see is that the key of the local cache is per
>> block of keys..am I wrong?
>> Shouldn't it cycle over the list of passed keys?
>>
>> Right now it's the following:
>>
>> Cache> cache;
>>
>> public void eval(Object... keys) {
>> Row keyRow = Row.of(keys);
>> if (cache != null) {
>> List cachedRows = cache.getIfPresent(keyRow);
>> if (cachedRows != null) {
>> for (Row cachedRow : cachedRows) {
>> collect(cachedRow);
>> }
>> return;
>> }
>> }
>>  ...
>>
>> while I'd use the following (also for JDBC):
>>
>> Cache> cache;
>>
>> public void eval(Object... keys) {
>> Row keyRow = Row.of(keys);
>> if (cache != null) {
>> List cachedRows = cache.getIfPresent(keyRow);
>> if (cachedRows != null) {
>> for (Row cachedRow : cachedRows) {
>> collect(cachedRow);
>> }
>> return;
>> }
>> }
>>  ...
>>
>> public void eval(Object... keys) {
>> for (Object kkk : keys) {
>> Row keyRow = Row.of(kkk);
>> if (cache != null) {
>> List cachedRows = cache.getIfPresent(keyRow);
>> if (cachedRows != null) {
>> for (Row cachedRow : cachedRows) {
>> collect(cachedRow);
>> }
>> return;
>> }
>> }
>> }
>>  ...
>>
>> Am I missing something?
>>
>>
>> On Fri, Jun 28, 2019 at 4:18 PM JingsongLee 
>> wrote:
>>
>>> Hi Flavio:
>>>
>>> I just implement a JDBCLookupFunction[1]. You can use it as table
>>> function[2]. Or use
>>> blink temporal table join[3] (Need blink planner support).
>>> I add a google guava cache in JDBCLookupFunction with configurable
>>> cacheMaxSize
>>> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
>>> Is that you want?
>>>
>>> [1]
>>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
>>> [2]
>>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
>>> [3]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75
>>>
>>>  Best, JingsongLee
>>>
>>> --
>>> From:Flavio Pompermaier 
>>> Send Time:2019年6月28日(星期五) 21:04
>>> To:user 
>>> Subject:LookupableTableSource question
>>>
>>> Hi to all,
>>> I have a use case where I'd like to enrich a stream using a rarely
>>> updated lookup table.
>>> Basically, I'd like to be able to set a refresh policy that is triggered
>>> either when a key was not found (a new key has probably been added in the
>>> mean time) or a configurable refresh-period has elapsed.
>>>
>>> Is there any suggested solution to this? The LookupableTableSource looks
>>> very similar to what I'd like to achieve but I can't find a real-world
>>> example using it and it lacks of such 2 requirements (key-values are not
>>> refreshed after a configurable timeout and a KeyNotFound callback cannot be
>>> handled).
>>>
>>> Any help is appreciated,
>>> Flavio
>>>
>>>
>>>
>>
>


Re: LookupableTableSource question

2019-06-28 Thread JingsongLee
The keys means joint primary keys, it is not list of keys, in your case, maybe 
there is a single key?

Best, Jingsong Lee


来自阿里邮箱 iPhone版
 --Original Mail --
From:Flavio Pompermaier 
Date:2019-06-28 22:53:31
Recipient:JingsongLee 
CC:user 
Subject:Re: LookupableTableSource question
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...
On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier  wrote:

This could be a good fit, I'll try to dig into it and see if it can be adapted 
to a REST service.
The only strange thing I see is that the key of the local cache is per block of 
keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

while I'd use the following (also for JDBC):

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee  wrote:

Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. 
Or use
 blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?

[1] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
[2] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
[3] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75

 Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年6月28日(星期五) 21:04
To:user 
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated 
lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either 
when a key was not found (a new key has probably been added in the mean time) 
or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very 
similar to what I'd like to achieve but I can't find a real-world example using 
it and it lacks of such 2 requirements (key-values are not refreshed after a 
configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio





Re: LookupableTableSource question

2019-06-28 Thread Flavio Pompermaier
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...

On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier 
wrote:

> This could be a good fit, I'll try to dig into it and see if it can be
> adapted to a REST service.
> The only strange thing I see is that the key of the local cache is per
> block of keys..am I wrong?
> Shouldn't it cycle over the list of passed keys?
>
> Right now it's the following:
>
> Cache> cache;
>
> public void eval(Object... keys) {
> Row keyRow = Row.of(keys);
> if (cache != null) {
> List cachedRows = cache.getIfPresent(keyRow);
> if (cachedRows != null) {
> for (Row cachedRow : cachedRows) {
> collect(cachedRow);
> }
> return;
> }
> }
>  ...
>
> while I'd use the following (also for JDBC):
>
> Cache> cache;
>
> public void eval(Object... keys) {
> Row keyRow = Row.of(keys);
> if (cache != null) {
> List cachedRows = cache.getIfPresent(keyRow);
> if (cachedRows != null) {
> for (Row cachedRow : cachedRows) {
> collect(cachedRow);
> }
> return;
> }
> }
>  ...
>
> public void eval(Object... keys) {
> for (Object kkk : keys) {
> Row keyRow = Row.of(kkk);
> if (cache != null) {
> List cachedRows = cache.getIfPresent(keyRow);
> if (cachedRows != null) {
> for (Row cachedRow : cachedRows) {
> collect(cachedRow);
> }
> return;
> }
> }
> }
>  ...
>
> Am I missing something?
>
>
> On Fri, Jun 28, 2019 at 4:18 PM JingsongLee 
> wrote:
>
>> Hi Flavio:
>>
>> I just implement a JDBCLookupFunction[1]. You can use it as table
>> function[2]. Or use
>> blink temporal table join[3] (Need blink planner support).
>> I add a google guava cache in JDBCLookupFunction with configurable
>> cacheMaxSize
>> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
>> Is that you want?
>>
>> [1]
>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
>> [2]
>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
>> [3]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75
>>
>>  Best, JingsongLee
>>
>> --
>> From:Flavio Pompermaier 
>> Send Time:2019年6月28日(星期五) 21:04
>> To:user 
>> Subject:LookupableTableSource question
>>
>> Hi to all,
>> I have a use case where I'd like to enrich a stream using a rarely
>> updated lookup table.
>> Basically, I'd like to be able to set a refresh policy that is triggered
>> either when a key was not found (a new key has probably been added in the
>> mean time) or a configurable refresh-period has elapsed.
>>
>> Is there any suggested solution to this? The LookupableTableSource looks
>> very similar to what I'd like to achieve but I can't find a real-world
>> example using it and it lacks of such 2 requirements (key-values are not
>> refreshed after a configurable timeout and a KeyNotFound callback cannot be
>> handled).
>>
>> Any help is appreciated,
>> Flavio
>>
>>
>>
>


Re: LookupableTableSource question

2019-06-28 Thread Flavio Pompermaier
This could be a good fit, I'll try to dig into it and see if it can be
adapted to a REST service.
The only strange thing I see is that the key of the local cache is per
block of keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
 ...

while I'd use the following (also for JDBC):

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
 ...

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee  wrote:

> Hi Flavio:
>
> I just implement a JDBCLookupFunction[1]. You can use it as table
> function[2]. Or use
> blink temporal table join[3] (Need blink planner support).
> I add a google guava cache in JDBCLookupFunction with configurable
> cacheMaxSize
> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
> Is that you want?
>
> [1]
> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
> [2]
> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
> [3]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75
>
>  Best, JingsongLee
>
> --
> From:Flavio Pompermaier 
> Send Time:2019年6月28日(星期五) 21:04
> To:user 
> Subject:LookupableTableSource question
>
> Hi to all,
> I have a use case where I'd like to enrich a stream using a rarely updated
> lookup table.
> Basically, I'd like to be able to set a refresh policy that is triggered
> either when a key was not found (a new key has probably been added in the
> mean time) or a configurable refresh-period has elapsed.
>
> Is there any suggested solution to this? The LookupableTableSource looks
> very similar to what I'd like to achieve but I can't find a real-world
> example using it and it lacks of such 2 requirements (key-values are not
> refreshed after a configurable timeout and a KeyNotFound callback cannot be
> handled).
>
> Any help is appreciated,
> Flavio
>
>
>


Re: LookupableTableSource question

2019-06-28 Thread JingsongLee
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. 
Or use
 blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?

[1] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
[2] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
[3] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75

 Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年6月28日(星期五) 21:04
To:user 
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated 
lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either 
when a key was not found (a new key has probably been added in the mean time) 
or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very 
similar to what I'd like to achieve but I can't find a real-world example using 
it and it lacks of such 2 requirements (key-values are not refreshed after a 
configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio