Python has a very limited amount of sources which you can use that can be
used to create an unbounded pipeline. There is some documentation here as
to the various limits:
https://beam.apache.org/documentation/sdks/python-streaming/

To achieve what your looking for, use ReadFromPubsub and use a ParDo that
reads the filename and parses all the records that you want. You will also
need to write a ParDo that outputs the data to files.


On Fri, Aug 24, 2018 at 11:47 AM Tóth Andor <
andor.t...@centralmediacsoport.hu> wrote:

> I can deal with being less efficient if it’s more transparent :). I want
> to achieve the latter, unbounded scenario. How can it be made?
>
>
>
> --
>
> Bests,
>
> Andor
>
>
>
> *Feladó:* Lukasz Cwik [mailto:lc...@google.com]
> *Küldve:* Friday, August 24, 2018 19:58
> *Címzett:* user@beam.apache.org
> *Tárgy:* Re: Window bounded to unbounded PCollection
>
>
>
> In a bounded pipeline "GroupByKey waits for all data to arrive, and no
> output files appear until the end." is expected. Runners like Dataflow,
> Spark, ... process the transform tree in topological order. In your
> pipeline, this would mean that all of the data is read and written to the
> group by key. Once that is complete, all of the data is read from the group
> by key and then written to the output files. This is done because it is
> very efficient.
>
>
>
> If you treat this as an unbounded pipeline, then all transforms are
> running in parallel and partial progress is continuously made but this is
> significantly less efficient then the above.
>
>
>
> On Fri, Aug 24, 2018 at 12:39 AM Tóth Andor <
> andor.t...@centralmediacsoport.hu> wrote:
>
> Yes, I did. The same happens as currently with windowing. GroupByKey waits
> for all data to arrive, and no output files appear until the end.
>
> Maybe it’s not a memory issue and Beam sorts this out with temporary
> files. But if there’s a problem in thousands of files and such amount of
> data, then I won’t be able to fix and resume, already preprocessed data in
> temporary files are lost. Also I cannot really see the progress.
>
>
>
> Meanwhile I had to realize, that there’s something with the
> AddTimestampDoFn, because if on a small sample I collect the output of it
> into a list, then those are not TimestampedValue objects, therefore
> windowing also could not occur. I suppose that’s because the PCollection
> I’m reading from is not unbounded, therefore timestamp is thrown away. It
> may sound nonsense if you know Beam well, but that’s my best guess now.
>
>
>
> I have found a possible workaround, as to push file contents into PubSub,
> then read data from there with Beam/DataFlow. Dataflow even has a template
> for that called “GCS Text to Cloud PubSub”. Though, I can’t believe that
> there’s no simpler and more elegant way to solve this.
>
>
>
> --
>
> Bests,
>
> Andor
>
>
>
> *Feladó:* Lukasz Cwik [mailto:lc...@google.com]
> *Küldve:* Friday, August 24, 2018 00:10
> *Címzett:* user@beam.apache.org
> *Tárgy:* Re: Window bounded to unbounded PCollection
>
>
>
> It is unclear what the purpose of windowing is since windowing doesn't
> impact memory utilization much on Google Dataflow.
>
>
>
> The Direct runner uses memory proportional to the amount of data processed
> but the Google Dataflow runner does not. The Google Dataflow runner's
> memory usage is proportional to the size of your elements. You can reach
> out to Google Cloud Platform support with some job ids or additional
> questions if you want.
>
>
>
> Have you tried a pipeline like (without windowing):
>
>
>
> lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input)
>
> results = lines \
> | 'groupbykey' >> beam.GroupByKey()\
> | 'parse' >>
> | beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows',
> | 'invalid_rows', 'missing_recipients', main='main_valid')
> output = results['main_valid'] \
>         | 'format' >> beam.Map(output_format) \
>         | 'write' >> beam.io.WriteToText(known_args.output,
> file_name_suffix=".gz")
>
>
>
>
>
> On Thu, Aug 23, 2018 at 5:32 AM Tóth Andor <
> andor.t...@centralmediacsoport.hu> wrote:
>
> Hello,
>
> I'm trying to process a few terrabytes of mail logs without using too much
> memory, by windowing the bounded source into an unbounded one.
> Still, the GroupByKey waits for all data to arrive. Can you give me hints
> how to work this around?
>
> I have already searched, read available manuals and documentation, but I
> still don't have a clue.
> Neither Direct, nor Google Dataflow runner works.
>
> I'm using Python. Every item gets a timestamp, and then they are sorted to
> sliding windows, by the following code:
>
> lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input)
>
> window_trigger = trigger.AfterWatermark() sliding_window =
> beam.window.SlidingWindows(size=3600+600, period=3600) windowed_lines =
> lines \
> | 'timestamp' >> beam.ParDo(AddTimestampDoFn())\ 'window' >>
> | beam.WindowInto(sliding_window, trigger=window_trigger,
> | accumulation_mode=trigger.AccumulationMode.DISCARDING)
>
> results = windowed_lines \
> | 'groupbykey' >> beam.GroupByKey()\
> | 'parse' >>
> | beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows',
> | 'invalid_rows', 'missing_recipients', main='main_valid')
>
> output = results['main_valid'] \
>         | 'format' >> beam.Map(output_format)\
>         | 'write' >> beam.io.WriteToText(known_args.output,
> file_name_suffix=".gz")
>
> --
> Bests,
> Andor
>
>

Reply via email to