Hi! This is an excellent question; don't have time to reply in much detail
right now, but please take a look at http://s.apache.org/splittable-do-fn -
it unifies the concepts of bounded and unbounded sources, and the use case
you mentioned is one of the motivating examples.

Also, see recent discussions on pipeline termination semantics: technically
nothing should prevent an unbounded source from saying it's done "for real"
(no new data will appear), just the current UnboundedSource API does not
expose such a method. (but Splittable DoFn does)

On Sat, Apr 22, 2017 at 11:15 AM peay <[email protected]> wrote:

> Hello,
>
> A use case I find myself running into frequently is the following: I have
> daily or hourly files, and a Beam pipeline with a small to moderate size
> windows. (Actually, I've just seen that support for per-window files
> support in file based sinks was recently checked in, which is one way to
> get there).
>
> Now, Beam has no clue about the fact that each file corresponds to a given
> time interval. My understanding is that when running the pipeline in batch
> mode with a bounded source, there is no notion watermark and we have to
> load everything because we just don't know. This is pretty wasteful,
> especially as you have to keep a lot of data in memory, while you could in
> principle operate close to what you'd do in streaming mode: first read the
> oldest files, then newest files, moving the watermark forward as you go
> through the input list of files.
>
> I see one way around this. Let's say that I have hourly files and let's
> not assume anything about the order of records within the file to keep it
> simple: I don't want a very precise record-level watermark, but more a
> rough watermark at the granularity of hours. Say we can easily get the
> corresponding time interval from the filename. One can make an unbounded
> source that essentially acts as a "List of bounded file-based sources". If
> there are K splits, split k can read every file that has `index % K == k`
> in the time-ordered list of files. `advance` can advance the current file,
> and move on to the next one if no records were read.
>
> However, as far as I understand, this pipeline will never terminate since
> this is an unbounded source and having the `advance` method of our wrapping
> source return `false` won't make the pipeline terminate. Can someone
> confirm if this is correct? If yes, what would be ways to work around that?
> There's always the option to throw to make the pipeline fail, but this is
> far from ideal.
>
> Thanks,
>

Reply via email to