A Flink application does not have a problem if it ingests two streams with
very different throughput as long as they are somewhat synced on their
This is typically the case when ingesting real-time data. In such
scenarios, an application would not buffer more data than necessary.

When reading two streams of historic data with different "density" (events
per time interval) or real-time streams that are off by some time interval,
the application needs to buffer more data to compensate for the difference
in time.
In case of real-time streams that are off by a (more or less) fixed offset,
you should plan for the additional state requirements. Syncing sources to
the same event-time would help in both cases.
However, Flink's RocksDB state backend is also pretty good in handling very
large state sizes due to asynchronous and incremental checkpointing.

The window join functions of the SQL and Table API are implemented using a
CoProcessFunction and so is the new join operator that I pointed to.

Syncing sources is not really related to fault tolerance except that
additional state affects the checkpointing and recovery performance.
Pausing sources can cause problems because watermarks do not advance when
no data is ingested, but again this is not related to fault tolerance.

The credit-based network transfer will be included in Flink 1.5. However,
this is not related to the question discussed here.
It only applies to cases where an operator cannot continue processing, for
example if the function call does not return.
An operator cannot decide to block a particular input and process the other

Long story short.
If you join two streams on event time, you need to buffer the data for the
join window + the event time difference between both streams.

Best, Fabian

2018-03-09 9:28 GMT+01:00 Gytis Žilinskas <gytis.zilins...@gmail.com>:

> Thanks for the answers and discussion both of you.
> The FLIP mentions that the cases where one stream is much faster than
> the other one, will not be handled for now either, so I guess it would
> still not solve our problems. As for the join semantics itself, I
> think we achieve the same thing with CoProcessFunction, unless I'm
> missing something.
> Anyway, one couple more questions then. It seems weird that this issue
> isn't much more talked about or prioritized. That leads me to believe
> that maybe we're misunderstanding the use case for flink, or maybe
> other users have a different architecture / environment that doesn't
> present them with such problems. Could you describe how it is usually
> used?
> From the documentation and talks it looks like fault tolerance is an
> important concept in flink, so a source pausing, or slowing down is
> expected. The way I see it, the only options to deal with it at the
> moment:
> 1) have a cluster size that can buffer everything for as long as
> needed and is able to eventually catch up
> 2) model the behaviour so that the streams that are ahead, can go
> through after some cutoff time
> do most of the applications just fall into one of these behaviours?
> Finally, are there some ideas about extending capabilities of the
> backpressure mechanism that would allow of building some sort of
> functionality, similar to what I was describing in the initial mail.
> With some prioritisation to one of the streams, or other custom
> stalling behaviour. (maybe this credit based approach Vishal mentions?
> The FLIP document is not public, so can't really tell)
> Thanks again for all the help!
> Gytis
> On Thu, Mar 8, 2018 at 7:48 PM, Vishal Santoshi
> <vishal.santo...@gmail.com> wrote:
> > Yep.  I think this leads to this general question and may be not
> pertinent
> > to https://github.com/apache/flink/pull/5342.  How do we throttle a
> source
> > if the held back data gets unreasonably large ? I know that that is in
> > itself a broader question but delayed watermarks of slow stream
> accentuates
> > the issue . I am curious to know how credit based back pressure handling
> > plays or is that outside the realm of this discussion ? And is credit
> based
> > back pressure handling in 1.5 release.
> >
> > On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >>
> >> The join would not cause backpressure but rather put all events that
> >> cannot be processed yet into state to process them later.
> >> So this works well if the data that is provided by the streams is
> roughly
> >> aligned by event time.
> >>
> >> 2018-03-08 9:04 GMT-08:00 Vishal Santoshi <vishal.santo...@gmail.com>:
> >>>
> >>> Aah we have it here
> >>> https://docs.google.com/document/d/16GMH5VM6JJiWj_
> N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6
> >>>
> >>> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi
> >>> <vishal.santo...@gmail.com> wrote:
> >>>>
> >>>> This is very interesting.  I would imagine that there will be high
> back
> >>>> pressure on the LEFT source effectively throttling it but as is the
> current
> >>>> state that is likely effect other pipelines as the free o/p buffer on
> the
> >>>> source side and and i/p buffers on the consumer side start blocking
> and get
> >>>> exhausted for all other pipes. I am very interested in how holding
> back the
> >>>> busy source does not create a pathological  issue where that source is
> >>>> forever held back. Is there a FLIP for it ?
> >>>>
> >>>> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <fhue...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>> Hi Gytis,
> >>>>>
> >>>>> Flink does currently not support holding back individual streams, for
> >>>>> example it is not possible to align streams on (offset) event-time.
> >>>>>
> >>>>> However, the Flink community is working on a windowed join for the
> >>>>> DataStream API, that only holds the relevant tail of the stream as
> state.
> >>>>> If your join condition is +/- 5 minutes then, the join would store he
> >>>>> last five minutes of both streams as state. Here's an implementation
> of the
> >>>>> operator [1] that is close to be merged and will be available in
> Flink
> >>>>> 1.6.0.
> >>>>> Flink's SQL support (and Table API) support this join type since
> >>>>> version 1.4.0 [2].
> >>>>>
> >>>>> Best, Fabian
> >>>>>
> >>>>> [1] https://github.com/apache/flink/pull/5342
> >>>>> [2]
> >>>>> https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/table/sql.html#joins
> >>>>>
> >>>>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <gytis.zilins...@gmail.com
> >:
> >>>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> we're considering flink for a couple of our projects. I'm doing a
> >>>>>> trial implementation for one of them. So far, I like a lot of
> things,
> >>>>>> however there are a couple of issues that I can't figure out how to
> >>>>>> resolve. Not sure if it's me misunderstanding the tool, or flink
> just
> >>>>>> doesn't have a capability to do it.
> >>>>>>
> >>>>>> We want to do an event time join on two big kafka streams. Both of
> >>>>>> them might experience some issues on the other end and be delayed.
> >>>>>> Additionally, while both are big, one (let's call it stream A) is
> >>>>>> significantly larger than stream B.
> >>>>>>
> >>>>>> We also know, that the join window is around 5min. That is, given
> some
> >>>>>> key K in stream B, if there is a counterpart in stream A, it's going
> >>>>>> to be +/5 5min in event time.
> >>>>>>
> >>>>>> Since stream A is especially heavy and it's unfeasable to keep hours
> >>>>>> of it in memory, I would imagine an ideal solution where we read
> both
> >>>>>> streams from Kafka. We always make sure that stream B is ahead by
> >>>>>> 10min, that is, if stream A is currently ahead in watermarks, we
> stall
> >>>>>> it and consume stream B until it catches up. Once the stream are
> >>>>>> alligned in event time (with the 10min delay window) we run them
> both
> >>>>>> through join.
> >>>>>>
> >>>>>> The problem is, that I find a mechanism to implement that in flink.
> If
> >>>>>> I try to do a CoProcessFunction then it just consumes both streams
> at
> >>>>>> the same time, ingests a lot of messages from stream A, runs out of
> >>>>>> memory and dies.
> >>>>>>
> >>>>>> Any ideas on how this could be solved?
> >>>>>>
> >>>>>> (here's a thread with a very similar problem from some time ago
> >>>>>>
> >>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/synchronizing-two-streams-td6830.html)
> >>>>>>
> >>>>>> Regards,
> >>>>>> Gytis
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >

Reply via email to