In a bounded pipeline "GroupByKey waits for all data to arrive, and no output files appear until the end." is expected. Runners like Dataflow, Spark, ... process the transform tree in topological order. In your pipeline, this would mean that all of the data is read and written to the group by key. Once that is complete, all of the data is read from the group by key and then written to the output files. This is done because it is very efficient.
If you treat this as an unbounded pipeline, then all transforms are running in parallel and partial progress is continuously made but this is significantly less efficient then the above. On Fri, Aug 24, 2018 at 12:39 AM Tóth Andor < andor.t...@centralmediacsoport.hu> wrote: > Yes, I did. The same happens as currently with windowing. GroupByKey waits > for all data to arrive, and no output files appear until the end. > > Maybe it’s not a memory issue and Beam sorts this out with temporary > files. But if there’s a problem in thousands of files and such amount of > data, then I won’t be able to fix and resume, already preprocessed data in > temporary files are lost. Also I cannot really see the progress. > > > > Meanwhile I had to realize, that there’s something with the > AddTimestampDoFn, because if on a small sample I collect the output of it > into a list, then those are not TimestampedValue objects, therefore > windowing also could not occur. I suppose that’s because the PCollection > I’m reading from is not unbounded, therefore timestamp is thrown away. It > may sound nonsense if you know Beam well, but that’s my best guess now. > > > > I have found a possible workaround, as to push file contents into PubSub, > then read data from there with Beam/DataFlow. Dataflow even has a template > for that called “GCS Text to Cloud PubSub”. Though, I can’t believe that > there’s no simpler and more elegant way to solve this. > > > > -- > > Bests, > > Andor > > > > *Feladó:* Lukasz Cwik [mailto:lc...@google.com] > *Küldve:* Friday, August 24, 2018 00:10 > *Címzett:* user@beam.apache.org > *Tárgy:* Re: Window bounded to unbounded PCollection > > > > It is unclear what the purpose of windowing is since windowing doesn't > impact memory utilization much on Google Dataflow. > > > > The Direct runner uses memory proportional to the amount of data processed > but the Google Dataflow runner does not. The Google Dataflow runner's > memory usage is proportional to the size of your elements. You can reach > out to Google Cloud Platform support with some job ids or additional > questions if you want. > > > > Have you tried a pipeline like (without windowing): > > > > lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input) > > results = lines \ > | 'groupbykey' >> beam.GroupByKey()\ > | 'parse' >> > | beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows', > | 'invalid_rows', 'missing_recipients', main='main_valid') > output = results['main_valid'] \ > | 'format' >> beam.Map(output_format) \ > | 'write' >> beam.io.WriteToText(known_args.output, > file_name_suffix=".gz") > > > > > > On Thu, Aug 23, 2018 at 5:32 AM Tóth Andor < > andor.t...@centralmediacsoport.hu> wrote: > > Hello, > > I'm trying to process a few terrabytes of mail logs without using too much > memory, by windowing the bounded source into an unbounded one. > Still, the GroupByKey waits for all data to arrive. Can you give me hints > how to work this around? > > I have already searched, read available manuals and documentation, but I > still don't have a clue. > Neither Direct, nor Google Dataflow runner works. > > I'm using Python. Every item gets a timestamp, and then they are sorted to > sliding windows, by the following code: > > lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input) > > window_trigger = trigger.AfterWatermark() sliding_window = > beam.window.SlidingWindows(size=3600+600, period=3600) windowed_lines = > lines \ > | 'timestamp' >> beam.ParDo(AddTimestampDoFn())\ 'window' >> > | beam.WindowInto(sliding_window, trigger=window_trigger, > | accumulation_mode=trigger.AccumulationMode.DISCARDING) > > results = windowed_lines \ > | 'groupbykey' >> beam.GroupByKey()\ > | 'parse' >> > | beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows', > | 'invalid_rows', 'missing_recipients', main='main_valid') > > output = results['main_valid'] \ > | 'format' >> beam.Map(output_format)\ > | 'write' >> beam.io.WriteToText(known_args.output, > file_name_suffix=".gz") > > -- > Bests, > Andor > >