Hi,
I think there are two (somewhat) orthogonal problems here:

1) Determining when a stream of input data switches from the "reading old
data" to the "reading current data"  phase.

2) Blocking/buffering one input of an operator depending on some condition
on the other input.

I think 1. can only be solved by user code. 2) is quite hard: we would
either need to buffer elements of the blocked input and only start
processing once we are "allowed to" this can blow up very quickly and we
have to checkpoint those buffered elements or we would have to block
receiving data on that input which is not possible right now because this
can potentially lead to deadlocks in the topology.

I'm afraid there is no good solution for this right now.

Cheers,
Aljoscha

On Fri, 10 Feb 2017 at 09:26 Jonas <jo...@huntun.de> wrote:

> Tzu-Li (Gordon) Tai wrote
>
> Stream A has a rate of 100k tuples/s. After processing the whole Kafka
> queue, the rate drops to 10 tuples/s.
>
> Absolutely correct.
>
> Tzu-Li (Gordon) Tai wrote
>
> So what you are looking for is that flatMap2 for stream B only doing work
> after the job reaches the latest record in stream A?
>
> Very much so.
>
> Tzu-Li (Gordon) Tai wrote
> You could perhaps insert a special marker event into stream A every time
> you start running this job. Stream A has a rate of 100k tuples/s. After
> processing the whole Kafka queue, the rate drops to 10 tuples/s.
>
> I tried using stream punctuations but it is hard to know which one is the
> "last" punctuation, since after some time I might have mutliple in there.
>
> Imagine stream A has in total about 100M messages. We insert a Punctuation
> as message number 100.000.001. Works.
>
> Next week we need to start the job again. Stream A now has 110M messages
> and 2 punctuation marks. One at the 100.000.001 and one at 110.000.001.
> I cannot decide which one is the latest while processing the the stream.
>
> Tzu-Li (Gordon) Tai wrote
>
> Then, once flatMap2 is invoked with the special event, you can toggle
> logic in flatMap2 to actually start doing stuff.
>
> This has the issue that while stream A is being processed, I lose tuples
> from stream B because it is not "stopped". I think my use case is currently
> not really doable in Flink. -- Jonas
> ------------------------------
> View this message in context: Re: Start streaming tuples depending on
> another streams rate
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Start-streaming-tuples-depending-on-another-streams-rate-tp11542p11559.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Reply via email to