Re: Persisting Table in Flink API

2018-07-17 Thread Shivam Sharma
Thanks, Vino & Hequn.

On Mon, Jul 16, 2018 at 5:47 PM Hequn Cheng  wrote:

> Hi Shivam,
>
> I think the non-window stream-stream join can solve your problem.
> The non-window join will store all data from both inputs and output joined
> results. The semantics of non-window join is exactly the same with batch
> join.
> One important thing to note is that the state of join might grow
> infinitely depending on the number of distinct input rows, so please
> provide a query configuration with valid retention interval[1] to prevent
> excessive state size.
>
> Let me know If you have any other confusions.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#idle-state-retention-time
>
> On Mon, Jul 16, 2018 at 5:18 PM, Shivam Sharma <28shivamsha...@gmail.com>
> wrote:
>
>> Hi Vino,
>>
>> First I want to tell you that we are working on Flink SQL so there is no
>> chance to use Data Stream API.
>>
>> I will give one example of my use case here:-
>>
>> Let's say we have two Kafka Topics:
>>
>>1. UserName to UserId Mapping => {"userName": "shivam", "userId": 123}
>>2. User transactions information in which username is coming. => {"user":
>>"shivam", "transactionAmount": 3250}
>>
>> Final result should be like this  => {"user": "shivam", "userId": 123,
>> "transactionAmount": 3250}
>>
>> SQL Query for this: SELECT t2.user, t1.userID, t2.transactionAmount from
>> userTable as t1 join transactionTable as t2 on t1.userName = t2.user
>>
>> Now, whenever a transaction happens then we need to add UserId also in
>> the record using Flink SQL. We need to join these two streams. So need to
>> store userName to id mapping somewhere like in RocksDB
>>
>> Thanks
>>
>> On Mon, Jul 16, 2018 at 12:04 PM vino yang  wrote:
>>
>>> Hi Shivam,
>>>
>>> Can you provide more details about your use case? The join for batch or
>>> streaming? which join type (window or non-window or stream-dimension table
>>> join)?
>>>
>>> If it is stream-dimension table join and the table is huge, use Redis
>>> or some cache based on memory, can help to process your problem. And you
>>> can customize the flink's physical plan (like Hequn said) and use async
>>> operator to optimize access to the third-party system.
>>>
>>> Thanks,
>>> Vino yang.
>>>
>>> 2018-07-16 9:17 GMT+08:00 Hequn Cheng :
>>>
 Hi Shivam,

 Currently, fink sql/table-api support window join and non-window
 join[1].
 If your requirements are not being met by sql/table-api, you can also
 use the datastream to implement your own logic. You can refer to the
 non-window join implement as an example[2][3].

 Best, Hequn

 [1]
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
 [2]
 https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 [3]
 https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala

 On Sun, Jul 15, 2018 at 11:29 PM, Shivam Sharma <
 28shivamsha...@gmail.com> wrote:

> Hi,
>
> We have one use case in which we need to persist Table in Flink which
> can be later used to join with other tables. This table can be huge so we
> need to store it in off-heap but faster access. Any suggestions regarding
> this?
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> *
>


>>>
>>
>> --
>> Shivam Sharma
>> Data Engineer @ Goibibo
>> Indian Institute Of Information Technology, Design and Manufacturing
>> Jabalpur
>> Mobile No- (+91) 8882114744
>> Email:- 28shivamsha...@gmail.com
>> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
>> *
>>
>
>

-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
*


Re: Persisting Table in Flink API

2018-07-16 Thread Hequn Cheng
Hi Shivam,

I think the non-window stream-stream join can solve your problem.
The non-window join will store all data from both inputs and output joined
results. The semantics of non-window join is exactly the same with batch
join.
One important thing to note is that the state of join might grow infinitely
depending on the number of distinct input rows, so please provide a query
configuration with valid retention interval[1] to prevent excessive state
size.

Let me know If you have any other confusions.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#idle-state-retention-time

On Mon, Jul 16, 2018 at 5:18 PM, Shivam Sharma <28shivamsha...@gmail.com>
wrote:

> Hi Vino,
>
> First I want to tell you that we are working on Flink SQL so there is no
> chance to use Data Stream API.
>
> I will give one example of my use case here:-
>
> Let's say we have two Kafka Topics:
>
>1. UserName to UserId Mapping => {"userName": "shivam", "userId": 123}
>2. User transactions information in which username is coming. => {"user":
>"shivam", "transactionAmount": 3250}
>
> Final result should be like this  => {"user": "shivam", "userId": 123,
> "transactionAmount": 3250}
>
> SQL Query for this: SELECT t2.user, t1.userID, t2.transactionAmount from
> userTable as t1 join transactionTable as t2 on t1.userName = t2.user
>
> Now, whenever a transaction happens then we need to add UserId also in the
> record using Flink SQL. We need to join these two streams. So need to store
> userName to id mapping somewhere like in RocksDB
>
> Thanks
>
> On Mon, Jul 16, 2018 at 12:04 PM vino yang  wrote:
>
>> Hi Shivam,
>>
>> Can you provide more details about your use case? The join for batch or
>> streaming? which join type (window or non-window or stream-dimension table
>> join)?
>>
>> If it is stream-dimension table join and the table is huge, use Redis or
>> some cache based on memory, can help to process your problem. And you can
>> customize the flink's physical plan (like Hequn said) and use async
>> operator to optimize access to the third-party system.
>>
>> Thanks,
>> Vino yang.
>>
>> 2018-07-16 9:17 GMT+08:00 Hequn Cheng :
>>
>>> Hi Shivam,
>>>
>>> Currently, fink sql/table-api support window join and non-window
>>> join[1].
>>> If your requirements are not being met by sql/table-api, you can also
>>> use the datastream to implement your own logic. You can refer to the
>>> non-window join implement as an example[2][3].
>>>
>>> Best, Hequn
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>> master/dev/table/sql.html#joins
>>> [2] https://github.com/apache/flink/blob/master/flink-
>>> libraries/flink-table/src/main/scala/org/apache/flink/
>>> table/plan/nodes/datastream/DataStreamJoin.scala
>>> [3] https://github.com/apache/flink/blob/master/flink-
>>> libraries/flink-table/src/main/scala/org/apache/flink/
>>> table/runtime/join/NonWindowInnerJoin.scala
>>>
>>> On Sun, Jul 15, 2018 at 11:29 PM, Shivam Sharma <
>>> 28shivamsha...@gmail.com> wrote:
>>>
 Hi,

 We have one use case in which we need to persist Table in Flink which
 can be later used to join with other tables. This table can be huge so we
 need to store it in off-heap but faster access. Any suggestions regarding
 this?

 --
 Shivam Sharma
 Data Engineer @ Goibibo
 Indian Institute Of Information Technology, Design and Manufacturing
 Jabalpur
 Mobile No- (+91) 8882114744
 Email:- 28shivamsha...@gmail.com
 LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
 *

>>>
>>>
>>
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> *
>


Re: Persisting Table in Flink API

2018-07-16 Thread vino yang
Hi Shivam,

Thanks for providing more details about your use case.

So I know you mean two DataStream non-window join. There are two ways to
implement this :

1、user Flink's table/sql non-window join for Streaming : this way the
messages stored in state by Flink, you may not care the state but you
should care the idel state retention time , see here :
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#idle-state-retention-time
.
2、actually, your use case is more like stream and dimension (static, but
may be updated) table, this way you can store the first stream into
somewhere you want. But this join is not supported by Flink table/sql now.
So I said you may need to customize table/sql if you just want to use
table/sql. I did not ask you use and customize DataStream API. I said if
you want to use this feature in table/sql you should customize the physical
plan about the join, see here :
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala#L223
.

---
Thanks.
Vino.



2018-07-16 17:18 GMT+08:00 Shivam Sharma <28shivamsha...@gmail.com>:

> Hi Vino,
>
> First I want to tell you that we are working on Flink SQL so there is no
> chance to use Data Stream API.
>
> I will give one example of my use case here:-
>
> Let's say we have two Kafka Topics:
>
>1. UserName to UserId Mapping => {"userName": "shivam", "userId": 123}
>2. User transactions information in which username is coming. => {"user":
>"shivam", "transactionAmount": 3250}
>
> Final result should be like this  => {"user": "shivam", "userId": 123,
> "transactionAmount": 3250}
>
> SQL Query for this: SELECT t2.user, t1.userID, t2.transactionAmount from
> userTable as t1 join transactionTable as t2 on t1.userName = t2.user
>
> Now, whenever a transaction happens then we need to add UserId also in the
> record using Flink SQL. We need to join these two streams. So need to store
> userName to id mapping somewhere like in RocksDB
>
> Thanks
>
> On Mon, Jul 16, 2018 at 12:04 PM vino yang  wrote:
>
>> Hi Shivam,
>>
>> Can you provide more details about your use case? The join for batch or
>> streaming? which join type (window or non-window or stream-dimension table
>> join)?
>>
>> If it is stream-dimension table join and the table is huge, use Redis or
>> some cache based on memory, can help to process your problem. And you can
>> customize the flink's physical plan (like Hequn said) and use async
>> operator to optimize access to the third-party system.
>>
>> Thanks,
>> Vino yang.
>>
>> 2018-07-16 9:17 GMT+08:00 Hequn Cheng :
>>
>>> Hi Shivam,
>>>
>>> Currently, fink sql/table-api support window join and non-window
>>> join[1].
>>> If your requirements are not being met by sql/table-api, you can also
>>> use the datastream to implement your own logic. You can refer to the
>>> non-window join implement as an example[2][3].
>>>
>>> Best, Hequn
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>> master/dev/table/sql.html#joins
>>> [2] https://github.com/apache/flink/blob/master/flink-
>>> libraries/flink-table/src/main/scala/org/apache/flink/
>>> table/plan/nodes/datastream/DataStreamJoin.scala
>>> [3] https://github.com/apache/flink/blob/master/flink-
>>> libraries/flink-table/src/main/scala/org/apache/flink/
>>> table/runtime/join/NonWindowInnerJoin.scala
>>>
>>> On Sun, Jul 15, 2018 at 11:29 PM, Shivam Sharma <
>>> 28shivamsha...@gmail.com> wrote:
>>>
 Hi,

 We have one use case in which we need to persist Table in Flink which
 can be later used to join with other tables. This table can be huge so we
 need to store it in off-heap but faster access. Any suggestions regarding
 this?

 --
 Shivam Sharma
 Data Engineer @ Goibibo
 Indian Institute Of Information Technology, Design and Manufacturing
 Jabalpur
 Mobile No- (+91) 8882114744
 Email:- 28shivamsha...@gmail.com
 LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
 *

>>>
>>>
>>
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> *
>


Re: Persisting Table in Flink API

2018-07-16 Thread Shivam Sharma
Hi Vino,

First I want to tell you that we are working on Flink SQL so there is no
chance to use Data Stream API.

I will give one example of my use case here:-

Let's say we have two Kafka Topics:

   1. UserName to UserId Mapping => {"userName": "shivam", "userId": 123}
   2. User transactions information in which username is coming. => {"user":
   "shivam", "transactionAmount": 3250}

Final result should be like this  => {"user": "shivam", "userId": 123,
"transactionAmount": 3250}

SQL Query for this: SELECT t2.user, t1.userID, t2.transactionAmount from
userTable as t1 join transactionTable as t2 on t1.userName = t2.user

Now, whenever a transaction happens then we need to add UserId also in the
record using Flink SQL. We need to join these two streams. So need to store
userName to id mapping somewhere like in RocksDB

Thanks

On Mon, Jul 16, 2018 at 12:04 PM vino yang  wrote:

> Hi Shivam,
>
> Can you provide more details about your use case? The join for batch or
> streaming? which join type (window or non-window or stream-dimension table
> join)?
>
> If it is stream-dimension table join and the table is huge, use Redis or
> some cache based on memory, can help to process your problem. And you can
> customize the flink's physical plan (like Hequn said) and use async
> operator to optimize access to the third-party system.
>
> Thanks,
> Vino yang.
>
> 2018-07-16 9:17 GMT+08:00 Hequn Cheng :
>
>> Hi Shivam,
>>
>> Currently, fink sql/table-api support window join and non-window join[1].
>> If your requirements are not being met by sql/table-api, you can also use
>> the datastream to implement your own logic. You can refer to the non-window
>> join implement as an example[2][3].
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
>> [2]
>> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
>> [3]
>> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
>>
>> On Sun, Jul 15, 2018 at 11:29 PM, Shivam Sharma <28shivamsha...@gmail.com
>> > wrote:
>>
>>> Hi,
>>>
>>> We have one use case in which we need to persist Table in Flink which
>>> can be later used to join with other tables. This table can be huge so we
>>> need to store it in off-heap but faster access. Any suggestions regarding
>>> this?
>>>
>>> --
>>> Shivam Sharma
>>> Data Engineer @ Goibibo
>>> Indian Institute Of Information Technology, Design and Manufacturing
>>> Jabalpur
>>> Mobile No- (+91) 8882114744
>>> Email:- 28shivamsha...@gmail.com
>>> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
>>> *
>>>
>>
>>
>

-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
*


Re: Persisting Table in Flink API

2018-07-16 Thread vino yang
Hi Shivam,

Can you provide more details about your use case? The join for batch or
streaming? which join type (window or non-window or stream-dimension table
join)?

If it is stream-dimension table join and the table is huge, use Redis or
some cache based on memory, can help to process your problem. And you can
customize the flink's physical plan (like Hequn said) and use async
operator to optimize access to the third-party system.

Thanks,
Vino yang.

2018-07-16 9:17 GMT+08:00 Hequn Cheng :

> Hi Shivam,
>
> Currently, fink sql/table-api support window join and non-window join[1].
> If your requirements are not being met by sql/table-api, you can also use
> the datastream to implement your own logic. You can refer to the non-window
> join implement as an example[2][3].
>
> Best, Hequn
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sql.html#joins
> [2] https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flink/
> table/plan/nodes/datastream/DataStreamJoin.scala
> [3] https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/
> NonWindowInnerJoin.scala
>
> On Sun, Jul 15, 2018 at 11:29 PM, Shivam Sharma <28shivamsha...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We have one use case in which we need to persist Table in Flink which can
>> be later used to join with other tables. This table can be huge so we need
>> to store it in off-heap but faster access. Any suggestions regarding this?
>>
>> --
>> Shivam Sharma
>> Data Engineer @ Goibibo
>> Indian Institute Of Information Technology, Design and Manufacturing
>> Jabalpur
>> Mobile No- (+91) 8882114744
>> Email:- 28shivamsha...@gmail.com
>> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
>> *
>>
>
>


Re: Persisting Table in Flink API

2018-07-15 Thread Hequn Cheng
Hi Shivam,

Currently, fink sql/table-api support window join and non-window join[1].
If your requirements are not being met by sql/table-api, you can also use
the datastream to implement your own logic. You can refer to the non-window
join implement as an example[2][3].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins
[2]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
[3]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala

On Sun, Jul 15, 2018 at 11:29 PM, Shivam Sharma <28shivamsha...@gmail.com>
wrote:

> Hi,
>
> We have one use case in which we need to persist Table in Flink which can
> be later used to join with other tables. This table can be huge so we need
> to store it in off-heap but faster access. Any suggestions regarding this?
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> *
>


Persisting Table in Flink API

2018-07-15 Thread Shivam Sharma
Hi,

We have one use case in which we need to persist Table in Flink which can
be later used to join with other tables. This table can be huge so we need
to store it in off-heap but faster access. Any suggestions regarding this?

-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
*