I agree that ultimately user's shouldn't have to decide which mode they're executing. Batch or streaming execution should be a matter of a decision behind the scenes. We're not quite there yet. I would argue not even on the paper:
>And as Luke said, conceptually the model supports graphs that mix bounded and >unbounded PCollections, and I >can imagine runners that could implement that. The Flink Runner can definitely do that. However, the model is not prepared for this. For instance, how do you generate watermarks for Bounded sources? As far as I see you'll have to wrap them into an unbounded source to support watermark generation or let the backend generate watermarks automatically based on the timestamps seen. Please correct me if I'm missing something here. On Wed, Mar 30, 2016 at 6:39 PM, Frances Perry <[email protected]> wrote: > > > On Wed, Mar 30, 2016 at 9:21 AM, Maximilian Michels <[email protected]> wrote: >> >> Hi Bill, >> >> By "streaming mode" I meant Beam's built-in flag in PipelineOptions to >> indicate batch or stream execution mode. > > > The Beam model concept is bounded/unbounded data. I think of that streaming > flag as actually part of the Cloud Dataflow runner -- currently the > DataflowPipelineRunner is basically two runners (one batch, one streaming) > packaged as one and configured via that flag. > > Ideally, I'd remove that flag entirely and have the DataflowPipelineRunner > (and others) choose how to execute based on properties of the graph. > Currently that mapping is all unbounded collections -> streaming, all > bounded collections -> batch. But that doesn't have to be the case. An > incremental 'batch' runner could totally handle unbounded PCollections. And > as Luke said, conceptually the model supports graphs that mix bounded and > unbounded PCollections, and I can imagine runners that could implement that. > > >> >> Depending on this flag, the >> Flink Runner translates to either Flink's DataSet (batch) or >> DataStream (stream) API. The DataStream API can also read from bounded >> collections like file sources. >> >> Your mental model is by no means way off. Reading from bounded sources >> in a streaming program can make sense. However, the typical uses cases >> we have seen for streaming in the past, is to read from an unbounded >> source (e.g. Kafka) and process all incoming records using event time >> / windows / triggering as they arrive. This is a great advantage over >> the so called Lambda architecture where you typically have an >> approximate real-time streaming layer and a batch layer which does the >> heavy lifting. >> >> The typical use case I see for reading from a bounded collection in >> streaming, is when you have static data (like a ML model or a stop >> word list). These would typically be kept as state. On the other hand, >> your use case would also work if you assigned timestamps and >> watermarks correctly for the HDFS source. You can then union the >> streams and process the resulting stream correctly according to time. >> That is the beauty of event time. >> >> By default, the Flink Runner assigns ingestion time for Kafka. That >> means that timestamps/watermarks are assigned according to the current >> system time. This is probably not what you want when you want to >> combine these two streams. You can change timestamps between operators >> but you can only assign watermarks at the source. Thus, to assign >> watermarks from Kafka or a file source, you would have to implement a >> custom beam source. >> >> Cheers, >> Max >> >> >> On Wed, Mar 30, 2016 at 4:37 PM, Bill McCarthy >> <[email protected]> wrote: >> > Thanks Max, >> > >> > I'm unsure what you mean by the "batch part" and "streaming >> > execution"... >> > Are you saying that I have to run my entire pipeline in either batch or >> > streaming modes? >> > >> > Perhaps a brief description of what I'm trying to do would help: >> > >> > 1. I've got some historical intervalized timeseries data >> > 2. I've also got some live intervalized timeseries data >> > 3. I want to process both those flows of data in a uniform fashion: >> > calculating windowed statistics >> > 4. My thought was that I'd have the historical data stored in some easy >> > data >> > store for Beam (e.g. HDFS) >> > 5. I'd put live data on Kafka >> > 6. Then load up 2 PCollections, one from HDFS and one from Kafka >> > 7. Then perform a Flatten transform to get them in one PCollection >> > 8. Then run my windowing over the flattened PCollection >> > 9. ... >> > >> > I like this approach, because my transforms don't have to care about >> > whether >> > data is live or historical and I can have one processing pipeline for >> > both. >> > >> > Am I barking up the wrong tree? Is my mental model way off, with respect >> > to >> > how to combine historical and live data? >> > >> > Thanks >> > >> > Bill >> > >> > On Mar 30, 2016, at 5:59 AM, Maximilian Michels <[email protected]> wrote: >> > >> > Hi Bill, >> > >> > The batch part of the Flink Runners supports reading from a finite >> > collection, but I'm assuming you're working with the streaming execution >> > of >> > the Flink runner. We haven't implemented support in the Runner yet but >> > Flink >> > natively supports reading from finite sources. So it looks fairly easy >> > to >> > implement. I've filed a JIRA issue and would like to look into this >> > later >> > today. >> > >> > I've recently added a test case (UnboundedSourceITCase) which throws a >> > custom exception when the end of the output has been reached. >> > Admittedly, >> > that is not a very nice approach but it works. All the more, we should >> > support finite sources in streaming mode. >> > >> > Best, >> > Max >> > >> > On Wed, Mar 30, 2016 at 2:43 AM, William McCarthy >> > <[email protected]> wrote: >> >> >> >> Hi, >> >> >> >> I want to get access to a bounded PCollection in Beam on the Flink >> >> runner. >> >> Ideally, I’d pull from HDFS. Is that supported? If not, what would be >> >> the >> >> best way for me to get something bounded, for testing purposes? >> >> >> >> Thanks, >> >> >> >> Bill >> > >> > > >
