It is unclear what the purpose of windowing is since windowing doesn't impact memory utilization much on Google Dataflow.
The Direct runner uses memory proportional to the amount of data processed but the Google Dataflow runner does not. The Google Dataflow runner's memory usage is proportional to the size of your elements. You can reach out to Google Cloud Platform support with some job ids or additional questions if you want. Have you tried a pipeline like (without windowing): lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input) results = lines \ | 'groupbykey' >> beam.GroupByKey()\ | 'parse' >> | beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows', | 'invalid_rows', 'missing_recipients', main='main_valid') output = results['main_valid'] \ | 'format' >> beam.Map(output_format) \ | 'write' >> beam.io.WriteToText(known_args.output, file_name_suffix=".gz") On Thu, Aug 23, 2018 at 5:32 AM Tóth Andor < andor.t...@centralmediacsoport.hu> wrote: > Hello, > > I'm trying to process a few terrabytes of mail logs without using too much > memory, by windowing the bounded source into an unbounded one. > Still, the GroupByKey waits for all data to arrive. Can you give me hints > how to work this around? > > I have already searched, read available manuals and documentation, but I > still don't have a clue. > Neither Direct, nor Google Dataflow runner works. > > I'm using Python. Every item gets a timestamp, and then they are sorted to > sliding windows, by the following code: > > lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input) > > window_trigger = trigger.AfterWatermark() sliding_window = > beam.window.SlidingWindows(size=3600+600, period=3600) windowed_lines = > lines \ > | 'timestamp' >> beam.ParDo(AddTimestampDoFn())\ 'window' >> > | beam.WindowInto(sliding_window, trigger=window_trigger, > | accumulation_mode=trigger.AccumulationMode.DISCARDING) > > results = windowed_lines \ > | 'groupbykey' >> beam.GroupByKey()\ > | 'parse' >> > | beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows', > | 'invalid_rows', 'missing_recipients', main='main_valid') > > output = results['main_valid'] \ > | 'format' >> beam.Map(output_format)\ > | 'write' >> beam.io.WriteToText(known_args.output, > file_name_suffix=".gz") > > -- > Bests, > Andor > >