Hi Bill,

By "streaming mode" I meant Beam's built-in flag in PipelineOptions to
indicate batch or stream execution mode. Depending on this flag, the
Flink Runner translates to either Flink's DataSet (batch) or
DataStream (stream) API. The DataStream API can also read from bounded
collections like file sources.

Your mental model is by no means way off. Reading from bounded sources
in a streaming program can make sense. However, the typical uses cases
we have seen for streaming in the past, is to read from an unbounded
source (e.g. Kafka) and process all incoming records using event time
/ windows / triggering as they arrive. This is a great advantage over
the so called Lambda architecture where you typically have an
approximate real-time streaming layer and a batch layer which does the
heavy lifting.

The typical use case I see for reading from a bounded collection in
streaming, is when you have static data (like a ML model or a stop
word list). These would typically be kept as state. On the other hand,
your use case would also work if you assigned timestamps and
watermarks correctly for the HDFS source. You can then union the
streams and process the resulting stream correctly according to time.
That is the beauty of event time.

By default, the Flink Runner assigns ingestion time for Kafka. That
means that timestamps/watermarks are assigned according to the current
system time. This is probably not what you want when you want to
combine these two streams. You can change timestamps between operators
but you can only assign watermarks at the source. Thus, to assign
watermarks from Kafka or a file source, you would have to implement a
custom beam source.

Cheers,
Max


On Wed, Mar 30, 2016 at 4:37 PM, Bill McCarthy
<[email protected]> wrote:
> Thanks Max,
>
> I'm unsure what you mean by the "batch part" and "streaming execution"...
> Are you saying that I have to run my entire pipeline in either batch or
> streaming modes?
>
> Perhaps a brief description of what I'm trying to do would help:
>
> 1. I've got some historical intervalized timeseries data
> 2. I've also got some live intervalized timeseries data
> 3. I want to process both those flows of data in a uniform fashion:
> calculating windowed statistics
> 4. My thought was that I'd have the historical data stored in some easy data
> store for Beam (e.g. HDFS)
> 5. I'd put live data on Kafka
> 6. Then load up 2 PCollections, one from HDFS and one from Kafka
> 7. Then perform a Flatten transform to get them in one PCollection
> 8. Then run my windowing over the flattened PCollection
> 9. ...
>
> I like this approach, because my transforms don't have to care about whether
> data is live or historical and I can have one processing pipeline for both.
>
> Am I barking up the wrong tree? Is my mental model way off, with respect to
> how to combine historical and live data?
>
> Thanks
>
> Bill
>
> On Mar 30, 2016, at 5:59 AM, Maximilian Michels <[email protected]> wrote:
>
> Hi Bill,
>
> The batch part of the Flink Runners supports reading from a finite
> collection, but I'm assuming you're working with the streaming execution of
> the Flink runner. We haven't implemented support in the Runner yet but Flink
> natively supports reading from finite sources. So it looks fairly easy to
> implement. I've filed a JIRA issue and would like to look into this later
> today.
>
> I've recently added a test case (UnboundedSourceITCase) which throws a
> custom exception when the end of the output has been reached. Admittedly,
> that is not a very nice approach but it works. All the more, we should
> support finite sources in streaming mode.
>
> Best,
> Max
>
> On Wed, Mar 30, 2016 at 2:43 AM, William McCarthy
> <[email protected]> wrote:
>>
>> Hi,
>>
>> I want to get access to a bounded PCollection in Beam on the Flink runner.
>> Ideally, I’d pull from HDFS. Is that supported? If not, what would be the
>> best way for me to get something bounded, for testing purposes?
>>
>> Thanks,
>>
>> Bill
>
>

Reply via email to