Sorry for the belated reply. You are right that the functionality proposed
in this FLIP can be implemented out of the Flink core as an ecosystem
The main motivation of this FLIP is two folds:
1. Improve the performance of intermediate result sharing in the same
Using the internal shuffle service to store cached result has two potential
a) the cached intermediate results may break the operator chaining due to
the addition of BLOCKING_PERSISTENT edge.
b) the downstream processor must read all the records in intermediate
results to process.
A pluggable intermediate result storage will help address both of the
problem. Adding a sink will not break chaining, but just ensure cached
logical node will not be optimized away. The pluggable storage can help
improve the performance by making the intermediate results filterable /
projectable, etc. Alternatively we can make the shuffle service more
sophisticated, but it may complicate things and is not necessary for the
This motivation seems difficult to be addressed as an external library on
top of Flink core, mainly because the in-session intermediate result
cleanup may need participation of RM to achieve fault tolerance. Also,
having an external library essentially introduces another way to cache the
in-session intermediate results.
2. Cross session intermediate result sharing.
As you said, this can be implemented as an external library. The only
difference is that users may need to deal with another set of API, but that
So for this FLIP, it would be good to see whether we think motivation 1 is
worth addressing or not.
What do you think?
Jiangjie (Becket) Qin
On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen wrote:
> Sorry for the late response. So many FLIPs these days.
> I am a bit unsure about the motivation here, and that this need to be a
> part of Flink. It sounds like this can be perfectly built around Flink as a
> minimal library on top of it, without any change in the core APIs or
> The proposal to handle "caching intermediate results" (to make them
> reusable across jobs in a session), and "writing them in different formats
> / indexing them" doesn't sound like it should be the same mechanism.
> - The caching part is a transparent low-level primitive. It avoid
> re-executing a part of the job graph, but otherwise is completely
> transparent to the consumer job.
> - Writing data out in a sink, compressing/indexing it and then reading it
> in another job is also a way of reusing a previous result, but on a
> completely different abstraction level. It is not the same intermediate
> result any more. When the consumer reads from it and applies predicate
> pushdown, etc. then the consumer job looks completely different from a job
> that consumed the original result. It hence needs to be solved on the API
> level via a sink and a source.
> I would suggest to keep these concepts separate: Caching (possibly
> automatically) for jobs in a session, and long term writing/sharing of data
> Solving the "long term writing/sharing" in a library rather than in the
> runtime also has the advantage of not pushing yet more stuff into Flink's
> core, which I believe is also an important criterion.
> On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su wrote:
> > Hi folks,
> > I would like to start the FLIP discussion thread about the pluggable
> > intermediate result storage.
> > This is phase 2 of FLIP-36: Support Interactive Programming in Flink Skip
> > to end of metadata. While the FLIP-36 provides a default implementation
> > the intermediate result storage using the shuffle service, we would like
> > make the intermediate result storage pluggable so that the user can
> > swap the storage.
> > We are looking forward to your thought!
> > The FLIP link is the following:
> > <
> > >
> > Best,
> > Xuannan