Hi, The PR is ready and I'm just struggling with setup of tests - Dataflow ValidatesRunner tests currently don't have a streaming execution. I think +Kenn Knowles <[email protected]> was doing something about that, or I might find a workaround.
But basically if you want to experiment - if you patch in the PR, you can experiment with SDF in Dataflow in streaming mode. It passes tests against the current production Dataflow Service. On Thu, Jun 15, 2017 at 8:54 AM peay <[email protected]> wrote: > Eugene, would you have an ETA on when splittable DoFn would be available > in Dataflow in batch/streaming mode? I see that > https://github.com/apache/beam/pull/1898 is still active > > I've started to experiment with those using the DirectRunner and this is a > great API. > > Thanks! > > -------- Original Message -------- > Subject: Re: Using watermarks with bounded sources > > Local Time: April 23, 2017 10:18 AM > UTC Time: April 23, 2017 2:18 PM > From: [email protected] > To: Eugene Kirpichov <[email protected]> > [email protected] <[email protected]> > > Ah, I didn't know about that. This is *really* great -- from a quick look, > the API looks both very natural and very powerful. Thanks a lot for getting > this into Beam! > > I see Flink support seems to have been merged already. Any idea on when > https://github.com/apache/beam/pull/1898 will get merged? > > I see updateWatermark in the API but not in the proposal's examples which > only uses resume/withFutureOutputWatermark. Any reason why updateWatermark > is not called after each output in the examples from the proposal? I guess > that would be "too fined-grained" to update it for each individual record > of a mini-batch? > > In my case with existing hourly files, would `outputElement(01:00 file), > updateWatermark(01:00), outputElement(02:00), updateWatermark(02:00), ...` > be the proper way to output per-hour elements while gradually moving the > watermark forward while going through an existing list? Or would you > instead suggest to still use resume (potentially with were small timeouts)? > > Thanks, > > -------- Original Message -------- > Subject: Re: Using watermarks with bounded sources > Local Time: 22 April 2017 3:59 PM > UTC Time: 22 April 2017 19:59 > From: [email protected] > To: peay <[email protected]>, [email protected] <[email protected] > > > > Hi! This is an excellent question; don't have time to reply in much detail > right now, but please take a look at http://s.apache.org/splittable-do-fn - > it unifies the concepts of bounded and unbounded sources, and the use case > you mentioned is one of the motivating examples. > > Also, see recent discussions on pipeline termination semantics: > technically nothing should prevent an unbounded source from saying it's > done "for real" (no new data will appear), just the current UnboundedSource > API does not expose such a method. (but Splittable DoFn does) > > On Sat, Apr 22, 2017 at 11:15 AM peay <[email protected]> wrote: > >> Hello, >> >> A use case I find myself running into frequently is the following: I have >> daily or hourly files, and a Beam pipeline with a small to moderate size >> windows. (Actually, I've just seen that support for per-window files >> support in file based sinks was recently checked in, which is one way to >> get there). >> >> Now, Beam has no clue about the fact that each file corresponds to a >> given time interval. My understanding is that when running the pipeline in >> batch mode with a bounded source, there is no notion watermark and we have >> to load everything because we just don't know. This is pretty wasteful, >> especially as you have to keep a lot of data in memory, while you could in >> principle operate close to what you'd do in streaming mode: first read the >> oldest files, then newest files, moving the watermark forward as you go >> through the input list of files. >> >> I see one way around this. Let's say that I have hourly files and let's >> not assume anything about the order of records within the file to keep it >> simple: I don't want a very precise record-level watermark, but more a >> rough watermark at the granularity of hours. Say we can easily get the >> corresponding time interval from the filename. One can make an unbounded >> source that essentially acts as a "List of bounded file-based sources". If >> there are K splits, split k can read every file that has `index % K == k` >> in the time-ordered list of files. `advance` can advance the current file, >> and move on to the next one if no records were read. >> >> However, as far as I understand, this pipeline will never terminate since >> this is an unbounded source and having the `advance` method of our wrapping >> source return `false` won't make the pipeline terminate. Can someone >> confirm if this is correct? If yes, what would be ways to work around that? >> There's always the option to throw to make the pipeline fail, but this is >> far from ideal. >> >> Thanks, >> > >
