> running a batch or "bounded stream" job first to generate a "cache state", > and then launching the main streaming job, which loads this initial state > load in open()... not sure how to work out the keying. >
This is the recommended workaround this issue - first start a job to precompute some values for an initial state and then pass those values to the main job as (for example) a startup argument. I think for now it’s the cleanest and the easiest to maintain solution. If initial state is too large, you could imagine saving it on a DFS and loading it in initialise phase of the main job. Piotrek > On 3 May 2018, at 19:03, Derek VerLee <derekver...@gmail.com> wrote: > > Thanks for the thoughts Piotr. > > Seems I have a talent for asking (nearly) the same question as someone else > at the same time, and the check-pointing was raised in that thread as well. > > I guess one way to conceptualize it is that you have is a stream job that has > "phases" and transitions between those phases. Maybe there would be a new > type of barrier to indicate a change between phases? But now I'm way outside > the bounds of hoping to have a "quick and dirty" version of a proper side > input implementation. > > I'm chewing on two new ideas now: Using a "union" stream instead of two > streams, and custom source backed by two different sources under the hood, so > the "state machine" logic transitioning from initialization to normal > operation all happen in the same operator. Or, running a batch or "bounded > stream" job first to generate a "cache state", and then launching the main > streaming job, which loads this initial state load in open()... not sure how > to work out the keying. > > I'll post back if I get anywhere with these ideas. > > On 5/3/18 10:49 AM, Piotr Nowojski wrote: >> Maybe it could work with Flink’s 1.5 credit base flow control. But you would >> need a way to express state “block one input side of the CoProcessFunction”, >> pass this information up to the input gate and handle it probably similar to >> how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks >> inputs in case of checkpoint barrier. You can not just block inside >> `processElement1` method. >> >> However I haven’t thought it through and maybe there could be some issues >> regarding checkpointing (what should happen to checkpoint barriers if you >> are blocking one side of the input? Should this block checkpoint barrier as >> well? Should you cancel checkpoint?). >> >> Piotrek >> >>> On 2 May 2018, at 16:31, Derek VerLee <derekver...@gmail.com> >>> <mailto:derekver...@gmail.com> wrote: >>> >>> >>> I was just thinking about about letting a coprocessfunction "block" or >>> cause back pressure on one of it's streams? >>> Has this been discussed as an option? >>> Does anyone know a way to effectively accomplish this? >>> >>> I think I could get a lot of mileage out of something like that without >>> needing a full implementation of FLIP-17 (which I would eagerly await >>> still). >>> >>> As mentioned on another thread, one could use a liststate to buffer the >>> main input until the "side input" was sufficiently processed. However the >>> downside of this is that I have no way to control the size of those >>> buffers, whereas with backpressure, the system will naturally take care of >>> it. >