Hello,
I'm trying to process a few terrabytes of mail logs without using too much
memory, by windowing the bounded source into an unbounded one.
Still, the GroupByKey waits for all data to arrive. Can you give me hints how
to work this around?
I have already searched, read available manuals and documentation, but I still
don't have a clue.
Neither Direct, nor Google Dataflow runner works.
I'm using Python. Every item gets a timestamp, and then they are sorted to
sliding windows, by the following code:
lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input)
window_trigger = trigger.AfterWatermark() sliding_window =
beam.window.SlidingWindows(size=3600+600, period=3600) windowed_lines = lines \
| 'timestamp' >> beam.ParDo(AddTimestampDoFn())\ 'window' >>
| beam.WindowInto(sliding_window, trigger=window_trigger,
| accumulation_mode=trigger.AccumulationMode.DISCARDING)
results = windowed_lines \
| 'groupbykey' >> beam.GroupByKey()\
| 'parse' >>
| beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows',
| 'invalid_rows', 'missing_recipients', main='main_valid')
output = results['main_valid'] \
| 'format' >> beam.Map(output_format)\
| 'write' >> beam.io.WriteToText(known_args.output,
file_name_suffix=".gz")
--
Bests,
Andor