Re: Initialization of broadcast state before processing main stream

2019-11-15 Thread Vasily Melnik
Maxim, great thanks.
We'll try buffering.

С уважением,
Василий Мельник


On Thu, 14 Nov 2019 at 19:36, Maxim Parkachov  wrote:

> Hi Vasily,
>
> unfortunately, this is known issue with Flink, you could read discussion
> under
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  .
>
> At the moment I have seen 3 solutions for this issue:
>
> 1. You buffer fact stream in local state before broadcast is completely
> read
> 2. You create custom source for fact stream and in open method wait before
> broadcast stream is completely read.
> 3. With latest Flink version, you could pre-populate state with dimension
> and start Flink job with existing state. You need to take care of setting
> correct kafka offsets for dimension stream though, otherwise you will get a
> gap between pre-populated state and moment when job is started.
>
> First 2  solutions need to know when broadcast stream is "completely
> read". I created workaround for this issue with custom source for dimension
> events. It creates "stop file" on shared file system, reads with admin
> interface kafka end offsets for dimension topic, start processing all
> messages from beginning and clears "stop file" after offset of messages
> reached end offsets for all partitions. Instead of "stop file" you could
> use shared lock in zookeeper.
>
> Hope this helps,
> Maxim.
>
> On Thu, Nov 14, 2019 at 7:42 AM vino yang  wrote:
>
>> Hi Vasily,
>>
>> Currently, Flink did not do the coordination between a general stream and
>> broadcast stream, they are both streams. Your scene of using the broadcast
>> state is a special one. In a more general scene, the states need to be
>> broadcasted is an unbounded stream, the state events may be broadcasted to
>> the downstream at any time. So it can not be wait to be done before playing
>> the usual stream events.
>>
>> For your scene:
>>
>>
>>- you can change your storage about dimension table, e.g. Redis or
>>MySQL and so on to do the stream and dimension table join;
>>- you can inject some control event in your broadcast stream to mark
>>the stream is end and let the fact stream wait until receiving the control
>>event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to
>>coordinate them, however, it would make your solution more complex.
>>
>> Best,
>> Vino
>>
>>
>> Vasily Melnik  于2019年11月14日周四
>> 下午1:28写道:
>>
>>> Hi all.
>>>
>>> In our task we have two Kafka topics:
>>> - one with fact stream (web traffic)
>>> - one with dimension
>>>
>>> We would like to put dimension data into broadcast state and lookup on
>>> int with facts. But we see that not all dimension records are put into
>>> state before first fact record is processed, so lookup gives no data.
>>>
>>> The question is: how could we read fact topic with some "delay" to give
>>> dimension enough time to initialize state?
>>>
>>>
>>> С уважением,
>>> Василий Мельник
>>>
>>


Re: Initialization of broadcast state before processing main stream

2019-11-14 Thread Maxim Parkachov
Hi Vasily,

unfortunately, this is known issue with Flink, you could read discussion
under
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
 .

At the moment I have seen 3 solutions for this issue:

1. You buffer fact stream in local state before broadcast is completely read
2. You create custom source for fact stream and in open method wait before
broadcast stream is completely read.
3. With latest Flink version, you could pre-populate state with dimension
and start Flink job with existing state. You need to take care of setting
correct kafka offsets for dimension stream though, otherwise you will get a
gap between pre-populated state and moment when job is started.

First 2  solutions need to know when broadcast stream is "completely read".
I created workaround for this issue with custom source for dimension
events. It creates "stop file" on shared file system, reads with admin
interface kafka end offsets for dimension topic, start processing all
messages from beginning and clears "stop file" after offset of messages
reached end offsets for all partitions. Instead of "stop file" you could
use shared lock in zookeeper.

Hope this helps,
Maxim.

On Thu, Nov 14, 2019 at 7:42 AM vino yang  wrote:

> Hi Vasily,
>
> Currently, Flink did not do the coordination between a general stream and
> broadcast stream, they are both streams. Your scene of using the broadcast
> state is a special one. In a more general scene, the states need to be
> broadcasted is an unbounded stream, the state events may be broadcasted to
> the downstream at any time. So it can not be wait to be done before playing
> the usual stream events.
>
> For your scene:
>
>
>- you can change your storage about dimension table, e.g. Redis or
>MySQL and so on to do the stream and dimension table join;
>- you can inject some control event in your broadcast stream to mark
>the stream is end and let the fact stream wait until receiving the control
>event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to
>coordinate them, however, it would make your solution more complex.
>
> Best,
> Vino
>
>
> Vasily Melnik  于2019年11月14日周四
> 下午1:28写道:
>
>> Hi all.
>>
>> In our task we have two Kafka topics:
>> - one with fact stream (web traffic)
>> - one with dimension
>>
>> We would like to put dimension data into broadcast state and lookup on
>> int with facts. But we see that not all dimension records are put into
>> state before first fact record is processed, so lookup gives no data.
>>
>> The question is: how could we read fact topic with some "delay" to give
>> dimension enough time to initialize state?
>>
>>
>> С уважением,
>> Василий Мельник
>>
>


Re: Initialization of broadcast state before processing main stream

2019-11-13 Thread vino yang
Hi Vasily,

Currently, Flink did not do the coordination between a general stream and
broadcast stream, they are both streams. Your scene of using the broadcast
state is a special one. In a more general scene, the states need to be
broadcasted is an unbounded stream, the state events may be broadcasted to
the downstream at any time. So it can not be wait to be done before playing
the usual stream events.

For your scene:


   - you can change your storage about dimension table, e.g. Redis or MySQL
   and so on to do the stream and dimension table join;
   - you can inject some control event in your broadcast stream to mark the
   stream is end and let the fact stream wait until receiving the control
   event. Or you can introduce a thrid-party coordinator e.g. ZooKeeper to
   coordinate them, however, it would make your solution more complex.

Best,
Vino


Vasily Melnik  于2019年11月14日周四
下午1:28写道:

> Hi all.
>
> In our task we have two Kafka topics:
> - one with fact stream (web traffic)
> - one with dimension
>
> We would like to put dimension data into broadcast state and lookup on int
> with facts. But we see that not all dimension records are put into state
> before first fact record is processed, so lookup gives no data.
>
> The question is: how could we read fact topic with some "delay" to give
> dimension enough time to initialize state?
>
>
> С уважением,
> Василий Мельник
>


Initialization of broadcast state before processing main stream

2019-11-13 Thread Vasily Melnik
Hi all.

In our task we have two Kafka topics:
- one with fact stream (web traffic)
- one with dimension

We would like to put dimension data into broadcast state and lookup on int
with facts. But we see that not all dimension records are put into state
before first fact record is processed, so lookup gives no data.

The question is: how could we read fact topic with some "delay" to give
dimension enough time to initialize state?


С уважением,
Василий Мельник