Though without the spelling mistake :-)

On Wed, Mar 30, 2016 at 10:46 AM, Mark Shields <[email protected]>
wrote:

> Makes sense. I wonder if:
>  s/Bounded/Seekable/g
>  s/Unbounded/Streamed/g
> ?
>
> On Wed, Mar 30, 2016 at 10:20 AM, Maximilian Michels <[email protected]>
> wrote:
>
>> That is actually what Flink does. It sends out a +inf watermark once
>> the source completes. This may be intuitive for the developer but
>> users will need a little more API sugar to conveniently use bounded
>> sources in streaming programs.
>>
>> On Wed, Mar 30, 2016 at 7:15 PM, Mark Shields <[email protected]>
>> wrote:
>> > Though BoundedSource must indeed be bounded (it supports splitting which
>> > requires the overall size to be known), there's nothing preventing an
>> > UnboundedSource from being bounded in the sense it may advance the
>> watermark
>> > to +inf when it has reached its 'end'. Google's streaming runner
>> doesn't do
>> > anything with that at the moment, but I could imagine that a
>> 'streaming' or
>> > 'incremental batch' runner could shut itself down once all sink
>> watermarks
>> > have gone to +inf.
>> >
>> > On Wed, Mar 30, 2016 at 9:55 AM, Maximilian Michels <[email protected]>
>> wrote:
>> >>
>> >> I agree that ultimately user's shouldn't have to decide which mode
>> >> they're executing. Batch or streaming execution should be a matter of
>> >> a decision behind the scenes. We're not quite there yet. I would argue
>> >> not even on the paper:
>> >>
>> >> >And as Luke said, conceptually the model supports graphs that mix
>> bounded
>> >> > and unbounded PCollections, and I >can imagine runners that could
>> implement
>> >> > that.
>> >>
>> >> The Flink Runner can definitely do that. However, the model is not
>> >> prepared for this. For instance, how do you generate watermarks for
>> >> Bounded sources? As far as I see you'll have to wrap them into an
>> >> unbounded source to support watermark generation or let the backend
>> >> generate watermarks automatically based on the timestamps seen. Please
>> >> correct me if I'm missing something here.
>> >>
>> >> On Wed, Mar 30, 2016 at 6:39 PM, Frances Perry <[email protected]> wrote:
>> >> >
>> >> >
>> >> > On Wed, Mar 30, 2016 at 9:21 AM, Maximilian Michels <[email protected]>
>> >> > wrote:
>> >> >>
>> >> >> Hi Bill,
>> >> >>
>> >> >> By "streaming mode" I meant Beam's built-in flag in PipelineOptions
>> to
>> >> >> indicate batch or stream execution mode.
>> >> >
>> >> >
>> >> > The Beam model concept is bounded/unbounded data. I think of that
>> >> > streaming
>> >> > flag as actually part of the Cloud Dataflow runner -- currently the
>> >> > DataflowPipelineRunner is basically two runners (one batch, one
>> >> > streaming)
>> >> > packaged as one and configured via that flag.
>> >> >
>> >> > Ideally, I'd remove that flag entirely and have the
>> >> > DataflowPipelineRunner
>> >> > (and others) choose how to execute based on properties of the graph.
>> >> > Currently that mapping is all unbounded collections -> streaming, all
>> >> > bounded collections -> batch. But that doesn't have to be the case.
>> An
>> >> > incremental 'batch' runner could totally handle unbounded
>> PCollections.
>> >> > And
>> >> > as Luke said, conceptually the model supports graphs that mix bounded
>> >> > and
>> >> > unbounded PCollections, and I can imagine runners that could
>> implement
>> >> > that.
>> >> >
>> >> >
>> >> >>
>> >> >> Depending on this flag, the
>> >> >> Flink Runner translates to either Flink's DataSet (batch) or
>> >> >> DataStream (stream) API. The DataStream API can also read from
>> bounded
>> >> >> collections like file sources.
>> >> >>
>> >> >> Your mental model is by no means way off. Reading from bounded
>> sources
>> >> >> in a streaming program can make sense. However, the typical uses
>> cases
>> >> >> we have seen for streaming in the past, is to read from an unbounded
>> >> >> source (e.g. Kafka) and process all incoming records using event
>> time
>> >> >> / windows / triggering as they arrive. This is a great advantage
>> over
>> >> >> the so called Lambda architecture where you typically have an
>> >> >> approximate real-time streaming layer and a batch layer which does
>> the
>> >> >> heavy lifting.
>> >> >>
>> >> >> The typical use case I see for reading from a bounded collection in
>> >> >> streaming, is when you have static data (like a ML model or a stop
>> >> >> word list). These would typically be kept as state. On the other
>> hand,
>> >> >> your use case would also work if you assigned timestamps and
>> >> >> watermarks correctly for the HDFS source. You can then union the
>> >> >> streams and process the resulting stream correctly according to
>> time.
>> >> >> That is the beauty of event time.
>> >> >>
>> >> >> By default, the Flink Runner assigns ingestion time for Kafka. That
>> >> >> means that timestamps/watermarks are assigned according to the
>> current
>> >> >> system time. This is probably not what you want when you want to
>> >> >> combine these two streams. You can change timestamps between
>> operators
>> >> >> but you can only assign watermarks at the source. Thus, to assign
>> >> >> watermarks from Kafka or a file source, you would have to implement
>> a
>> >> >> custom beam source.
>> >> >>
>> >> >> Cheers,
>> >> >> Max
>> >> >>
>> >> >>
>> >> >> On Wed, Mar 30, 2016 at 4:37 PM, Bill McCarthy
>> >> >> <[email protected]> wrote:
>> >> >> > Thanks Max,
>> >> >> >
>> >> >> > I'm unsure what you mean by the "batch part" and "streaming
>> >> >> > execution"...
>> >> >> > Are you saying that I have to run my entire pipeline in either
>> batch
>> >> >> > or
>> >> >> > streaming modes?
>> >> >> >
>> >> >> > Perhaps a brief description of what I'm trying to do would help:
>> >> >> >
>> >> >> > 1. I've got some historical intervalized timeseries data
>> >> >> > 2. I've also got some live intervalized timeseries data
>> >> >> > 3. I want to process both those flows of data in a uniform
>> fashion:
>> >> >> > calculating windowed statistics
>> >> >> > 4. My thought was that I'd have the historical data stored in some
>> >> >> > easy
>> >> >> > data
>> >> >> > store for Beam (e.g. HDFS)
>> >> >> > 5. I'd put live data on Kafka
>> >> >> > 6. Then load up 2 PCollections, one from HDFS and one from Kafka
>> >> >> > 7. Then perform a Flatten transform to get them in one PCollection
>> >> >> > 8. Then run my windowing over the flattened PCollection
>> >> >> > 9. ...
>> >> >> >
>> >> >> > I like this approach, because my transforms don't have to care
>> about
>> >> >> > whether
>> >> >> > data is live or historical and I can have one processing pipeline
>> for
>> >> >> > both.
>> >> >> >
>> >> >> > Am I barking up the wrong tree? Is my mental model way off, with
>> >> >> > respect
>> >> >> > to
>> >> >> > how to combine historical and live data?
>> >> >> >
>> >> >> > Thanks
>> >> >> >
>> >> >> > Bill
>> >> >> >
>> >> >> > On Mar 30, 2016, at 5:59 AM, Maximilian Michels <[email protected]>
>> >> >> > wrote:
>> >> >> >
>> >> >> > Hi Bill,
>> >> >> >
>> >> >> > The batch part of the Flink Runners supports reading from a finite
>> >> >> > collection, but I'm assuming you're working with the streaming
>> >> >> > execution
>> >> >> > of
>> >> >> > the Flink runner. We haven't implemented support in the Runner yet
>> >> >> > but
>> >> >> > Flink
>> >> >> > natively supports reading from finite sources. So it looks fairly
>> >> >> > easy
>> >> >> > to
>> >> >> > implement. I've filed a JIRA issue and would like to look into
>> this
>> >> >> > later
>> >> >> > today.
>> >> >> >
>> >> >> > I've recently added a test case (UnboundedSourceITCase) which
>> throws
>> >> >> > a
>> >> >> > custom exception when the end of the output has been reached.
>> >> >> > Admittedly,
>> >> >> > that is not a very nice approach but it works. All the more, we
>> >> >> > should
>> >> >> > support finite sources in streaming mode.
>> >> >> >
>> >> >> > Best,
>> >> >> > Max
>> >> >> >
>> >> >> > On Wed, Mar 30, 2016 at 2:43 AM, William McCarthy
>> >> >> > <[email protected]> wrote:
>> >> >> >>
>> >> >> >> Hi,
>> >> >> >>
>> >> >> >> I want to get access to a bounded PCollection in Beam on the
>> Flink
>> >> >> >> runner.
>> >> >> >> Ideally, I’d pull from HDFS. Is that supported? If not, what
>> would
>> >> >> >> be
>> >> >> >> the
>> >> >> >> best way for me to get something bounded, for testing purposes?
>> >> >> >>
>> >> >> >> Thanks,
>> >> >> >>
>> >> >> >> Bill
>> >> >> >
>> >> >> >
>> >> >
>> >> >
>> >
>> >
>>
>
>

Reply via email to