Hi Yun and Dawid, Dawid is correct in that: ``` BATCH = pipelined scheduling with region failover + blocking keyBy shuffles (all pointwise shuffles pipelined) STREAM = eager scheduling with checkpointing + pipelined keyBy shuffles AUTOMATIC = choose based on sources (ALL bounded == BATCH, STREAMING otherwise) ```
For allowing users to set the shuffling mode, we can consider it but I think we should be careful because if checkpointing is enabled (e.g. in STREAMING), then introducing a blocking shuffle will cause problems. The opposite, allowing pipelined execution for keyBy's in BATCH, may be ok. Kostas On Tue, Aug 18, 2020 at 11:40 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > > Hi all, > > @Klou Nice write up. One comment I have is I would suggest using a different > configuration parameter name. The way I understand the proposal the > BATCH/STREAMING/AUTOMATIC affects not only the scheduling mode but types of > shuffles as well. How about `execution.mode` ? Or `execution-runtime-mode`? > > @Yun The way I understand it > > BATCH = pipelined scheduling with region failover + blocking keyBy shuffles > (all pointwise shuffles pipelined) > > STREAM = eager scheduling with checkpointing + pipelined keyBy shuffles > > AUTOMATIC = choose based on source > > power users could still override any shuffle modes in > PartitionTransformation, if we find more people interested in controlling the > type of shuffles, we can think of exposing that in the DataStream as well in > the future. > > Best, > > Dawid > > On 18/08/2020 06:18, Yun Gao wrote: > > Hi, > > Very thanks for bringing up this discussion! > > One more question is that does the BATCH and STREAMING mode also decides > the shuffle types and operators? I'm asking so because that even for blocking > mode, it should also benefit from keeping some edges to be pipeline if the > resources are known to be enough. Do we also consider to expose more > fine-grained control on the shuffle types? > > Best, > Yun > > > ------------------Original Mail ------------------ > Sender:Kostas Kloudas <kklou...@apache.org> > Send Date:Tue Aug 18 02:24:21 2020 > Recipients:David Anderson <da...@alpinegizmo.com> > CC:dev <d...@flink.apache.org>, user <user@flink.apache.org> > Subject:Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input >> >> Hi Kurt and David, >> >> Thanks a lot for the insightful feedback! >> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally >> agree with you that it requires a lot more work and careful thinking >> on the semantics. This FLIP was written under the assumption that if >> the user wants to have checkpoints on bounded input, he/she will have >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH >> can be handled as a separate topic in the future. >> >> In the case of MIXED workloads and for this FLIP, the scheduling mode >> should be set to STREAMING. That is why the AUTOMATIC option sets >> scheduling to BATCH only if all the sources are bounded. I am not sure >> what are the plans there at the scheduling level, as one could imagine >> in the future that in mixed workloads, we schedule first all the >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED >> subgraph per application, which is going to be scheduled after all >> Bounded ones have finished. Essentially the bounded subgraphs will be >> used to bootstrap the unbounded one. But, I am not aware of any plans >> towards that direction. >> >> >> @David: The processing time timer handling is a topic that has also >> been discussed in the community in the past, and I do not remember any >> final conclusion unfortunately. >> >> In the current context and for bounded input, we chose to favor >> reproducibility of the result, as this is expected in batch processing >> where the whole input is available in advance. This is why this >> proposal suggests to not allow processing time timers. But I >> understand your argument that the user may want to be able to run the >> same pipeline on batch and streaming this is why we added the two >> options under future work, namely (from the FLIP): >> >> ``` >> Future Work: In the future we may consider adding as options the capability >> of: >> * firing all the registered processing time timers at the end of a job >> (at close()) or, >> * ignoring all the registered processing time timers at the end of a job. >> ``` >> >> Conceptually, we are essentially saying that we assume that batch >> execution is assumed to be instantaneous and refers to a single >> "point" in time and any processing-time timers for the future may fire >> at the end of execution or be ignored (but not throw an exception). I >> could also see ignoring the timers in batch as the default, if this >> makes more sense. >> >> By the way, do you have any usecases in mind that will help us better >> shape our processing time timer handling? >> >> Kostas >> >> On Mon, Aug 17, 2020 at 2:52 PM David Anderson <da...@alpinegizmo.com> wrote: >> > >> > Kostas, >> > >> > I'm pleased to see some concrete details in this FLIP. >> > >> > I wonder if the current proposal goes far enough in the direction of >> > recognizing the need some users may have for "batch" and "bounded >> > streaming" to be treated differently. If I've understood it correctly, the >> > section on scheduling allows me to choose STREAMING scheduling even if I >> > have bounded sources. I like that approach, because it recognizes that >> > even though I have bounded inputs, I don't necessarily want batch >> > processing semantics. I think it makes sense to extend this idea to >> > processing time support as well. >> > >> > My thinking is that sometimes in development and testing it's reasonable >> > to run exactly the same job as in production, except with different >> > sources and sinks. While it might be a reasonable default, I'm not >> > convinced that switching a processing time streaming job to read from a >> > bounded source should always cause it to fail. >> > >> > David >> > >> > On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas <kklou...@apache.org> wrote: >> >> >> >> Hi all, >> >> >> >> As described in FLIP-131 [1], we are aiming at deprecating the DataSet >> >> API in favour of the DataStream API and the Table API. After this work >> >> is done, the user will be able to write a program using the DataStream >> >> API and this will execute efficiently on both bounded and unbounded >> >> data. But before we reach this point, it is worth discussing and >> >> agreeing on the semantics of some operations as we transition from the >> >> streaming world to the batch one. >> >> >> >> This thread and the associated FLIP [2] aim at discussing these issues >> >> as these topics are pretty important to users and can lead to >> >> unpleasant surprises if we do not pay attention. >> >> >> >> Let's have a healthy discussion here and I will be updating the FLIP >> >> accordingly. >> >> >> >> Cheers, >> >> Kostas >> >> >> >> [1] >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 >> >> [2] >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522