Yes, I did. The same happens as currently with windowing. GroupByKey waits for 
all data to arrive, and no output files appear until the end.
Maybe it’s not a memory issue and Beam sorts this out with temporary files. But 
if there’s a problem in thousands of files and such amount of data, then I 
won’t be able to fix and resume, already preprocessed data in temporary files 
are lost. Also I cannot really see the progress.

Meanwhile I had to realize, that there’s something with the AddTimestampDoFn, 
because if on a small sample I collect the output of it into a list, then those 
are not TimestampedValue objects, therefore windowing also could not occur. I 
suppose that’s because the PCollection I’m reading from is not unbounded, 
therefore timestamp is thrown away. It may sound nonsense if you know Beam 
well, but that’s my best guess now.

I have found a possible workaround, as to push file contents into PubSub, then 
read data from there with Beam/DataFlow. Dataflow even has a template for that 
called “GCS Text to Cloud PubSub”. Though, I can’t believe that there’s no 
simpler and more elegant way to solve this.

--
Bests,
Andor

Feladó: Lukasz Cwik [mailto:lc...@google.com]
Küldve: Friday, August 24, 2018 00:10
Címzett: user@beam.apache.org
Tárgy: Re: Window bounded to unbounded PCollection

It is unclear what the purpose of windowing is since windowing doesn't impact 
memory utilization much on Google Dataflow.

The Direct runner uses memory proportional to the amount of data processed but 
the Google Dataflow runner does not. The Google Dataflow runner's memory usage 
is proportional to the size of your elements. You can reach out to Google Cloud 
Platform support with some job ids or additional questions if you want.

Have you tried a pipeline like (without windowing):

lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input)
results = lines \
| 'groupbykey' >> beam.GroupByKey()\
| 'parse' >>
| beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows',
| 'invalid_rows', 'missing_recipients', main='main_valid')
output = results['main_valid'] \
        | 'format' >> beam.Map(output_format) \
        | 'write' >> beam.io.WriteToText(known_args.output, 
file_name_suffix=".gz")


On Thu, Aug 23, 2018 at 5:32 AM Tóth Andor 
<andor.t...@centralmediacsoport.hu<mailto:andor.t...@centralmediacsoport.hu>> 
wrote:
Hello,

I'm trying to process a few terrabytes of mail logs without using too much 
memory, by windowing the bounded source into an unbounded one.
Still, the GroupByKey waits for all data to arrive. Can you give me hints how 
to work this around?

I have already searched, read available manuals and documentation, but I still 
don't have a clue.
Neither Direct, nor Google Dataflow runner works.

I'm using Python. Every item gets a timestamp, and then they are sorted to 
sliding windows, by the following code:

lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input)

window_trigger = trigger.AfterWatermark() sliding_window = 
beam.window.SlidingWindows(size=3600+600, period=3600) windowed_lines = lines \
| 'timestamp' >> beam.ParDo(AddTimestampDoFn())\ 'window' >>
| beam.WindowInto(sliding_window, trigger=window_trigger,
| accumulation_mode=trigger.AccumulationMode.DISCARDING)

results = windowed_lines \
| 'groupbykey' >> beam.GroupByKey()\
| 'parse' >>
| beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows',
| 'invalid_rows', 'missing_recipients', main='main_valid')

output = results['main_valid'] \
        | 'format' >> beam.Map(output_format)\
        | 'write' >> beam.io.WriteToText(known_args.output, 
file_name_suffix=".gz")

--
Bests,
Andor

Reply via email to