Hi!

The PR just got submitted. You can play with SDF in Dataflow streaming
runner now :) Hope it doesn't get rolled back (fingers crossed)...

On Mon, Jun 19, 2017 at 6:06 PM Eugene Kirpichov <[email protected]>
wrote:

> Hi,
> The PR is ready and I'm just struggling with setup of tests - Dataflow
> ValidatesRunner tests currently don't have a streaming execution.
> I think +Kenn Knowles <[email protected]> was doing something about that, or
> I might find a workaround.
>
> But basically if you want to experiment - if you patch in the PR, you can
> experiment with SDF in Dataflow in streaming mode. It passes tests against
> the current production Dataflow Service.
>
>
> On Thu, Jun 15, 2017 at 8:54 AM peay <[email protected]> wrote:
>
>> Eugene, would you have an ETA on when splittable DoFn would be available
>> in Dataflow in batch/streaming mode? I see that
>> https://github.com/apache/beam/pull/1898 is still active
>>
>> I've started to experiment with those using the DirectRunner and this is
>> a great API.
>>
>> Thanks!
>>
>> -------- Original Message --------
>> Subject: Re: Using watermarks with bounded sources
>>
>> Local Time: April 23, 2017 10:18 AM
>> UTC Time: April 23, 2017 2:18 PM
>> From: [email protected]
>> To: Eugene Kirpichov <[email protected]>
>> [email protected] <[email protected]>
>>
>> Ah, I didn't know about that. This is *really* great -- from a quick
>> look, the API looks both very natural and very powerful. Thanks a lot for
>> getting this into Beam!
>>
>> I see Flink support seems to have been merged already. Any idea on when
>> https://github.com/apache/beam/pull/1898 will get merged?
>>
>> I see updateWatermark in the API but not in the proposal's examples which
>> only uses resume/withFutureOutputWatermark.  Any reason why updateWatermark
>> is not called after each output in the examples from the proposal? I guess
>> that would be "too fined-grained" to update it for each individual record
>> of a mini-batch?
>>
>> In my case with existing hourly files, would `outputElement(01:00 file),
>> updateWatermark(01:00), outputElement(02:00), updateWatermark(02:00), ...`
>>  be the proper way to output per-hour elements while gradually moving the
>> watermark forward while going through an existing list? Or would you
>> instead suggest to still use resume (potentially with were small timeouts)?
>>
>> Thanks,
>>
>> -------- Original Message --------
>> Subject: Re: Using watermarks with bounded sources
>> Local Time: 22 April 2017 3:59 PM
>> UTC Time: 22 April 2017 19:59
>> From: [email protected]
>> To: peay <[email protected]>, [email protected] <
>> [email protected]>
>>
>> Hi! This is an excellent question; don't have time to reply in much
>> detail right now, but please take a look at
>> http://s.apache.org/splittable-do-fn - it unifies the concepts of
>> bounded and unbounded sources, and the use case you mentioned is one of the
>> motivating examples.
>>
>> Also, see recent discussions on pipeline termination semantics:
>> technically nothing should prevent an unbounded source from saying it's
>> done "for real" (no new data will appear), just the current UnboundedSource
>> API does not expose such a method. (but Splittable DoFn does)
>>
>> On Sat, Apr 22, 2017 at 11:15 AM peay <[email protected]> wrote:
>>
>>> Hello,
>>>
>>> A use case I find myself running into frequently is the following: I
>>> have daily or hourly files, and a Beam pipeline with a small to moderate
>>> size windows. (Actually, I've just seen that support for per-window files
>>> support in file based sinks was recently checked in, which is one way to
>>> get there).
>>>
>>> Now, Beam has no clue about the fact that each file corresponds to a
>>> given time interval. My understanding is that when running the pipeline in
>>> batch mode with a bounded source, there is no notion watermark and we have
>>> to load everything because we just don't know. This is pretty wasteful,
>>> especially as you have to keep a lot of data in memory, while you could in
>>> principle operate close to what you'd do in streaming mode: first read the
>>> oldest files, then newest files, moving the watermark forward as you go
>>> through the input list of files.
>>>
>>> I see one way around this. Let's say that I have hourly files and let's
>>> not assume anything about the order of records within the file to keep it
>>> simple: I don't want a very precise record-level watermark, but more a
>>> rough watermark at the granularity of hours. Say we can easily get the
>>> corresponding time interval from the filename. One can make an unbounded
>>> source that essentially acts as a "List of bounded file-based sources". If
>>> there are K splits, split k can read every file that has `index % K == k`
>>> in the time-ordered list of files. `advance` can advance the current file,
>>> and move on to the next one if no records were read.
>>>
>>> However, as far as I understand, this pipeline will never terminate
>>> since this is an unbounded source and having the `advance` method of our
>>> wrapping source return `false` won't make the pipeline terminate. Can
>>> someone confirm if this is correct? If yes, what would be ways to work
>>> around that? There's always the option to throw to make the pipeline fail,
>>> but this is far from ideal.
>>>
>>> Thanks,
>>>
>>
>>

Reply via email to