Python has a very limited amount of sources which you can use that can be used to create an unbounded pipeline. There is some documentation here as to the various limits: https://beam.apache.org/documentation/sdks/python-streaming/
To achieve what your looking for, use ReadFromPubsub and use a ParDo that reads the filename and parses all the records that you want. You will also need to write a ParDo that outputs the data to files. On Fri, Aug 24, 2018 at 11:47 AM Tóth Andor < andor.t...@centralmediacsoport.hu> wrote: > I can deal with being less efficient if it’s more transparent :). I want > to achieve the latter, unbounded scenario. How can it be made? > > > > -- > > Bests, > > Andor > > > > *Feladó:* Lukasz Cwik [mailto:lc...@google.com] > *Küldve:* Friday, August 24, 2018 19:58 > *Címzett:* user@beam.apache.org > *Tárgy:* Re: Window bounded to unbounded PCollection > > > > In a bounded pipeline "GroupByKey waits for all data to arrive, and no > output files appear until the end." is expected. Runners like Dataflow, > Spark, ... process the transform tree in topological order. In your > pipeline, this would mean that all of the data is read and written to the > group by key. Once that is complete, all of the data is read from the group > by key and then written to the output files. This is done because it is > very efficient. > > > > If you treat this as an unbounded pipeline, then all transforms are > running in parallel and partial progress is continuously made but this is > significantly less efficient then the above. > > > > On Fri, Aug 24, 2018 at 12:39 AM Tóth Andor < > andor.t...@centralmediacsoport.hu> wrote: > > Yes, I did. The same happens as currently with windowing. GroupByKey waits > for all data to arrive, and no output files appear until the end. > > Maybe it’s not a memory issue and Beam sorts this out with temporary > files. But if there’s a problem in thousands of files and such amount of > data, then I won’t be able to fix and resume, already preprocessed data in > temporary files are lost. Also I cannot really see the progress. > > > > Meanwhile I had to realize, that there’s something with the > AddTimestampDoFn, because if on a small sample I collect the output of it > into a list, then those are not TimestampedValue objects, therefore > windowing also could not occur. I suppose that’s because the PCollection > I’m reading from is not unbounded, therefore timestamp is thrown away. It > may sound nonsense if you know Beam well, but that’s my best guess now. > > > > I have found a possible workaround, as to push file contents into PubSub, > then read data from there with Beam/DataFlow. Dataflow even has a template > for that called “GCS Text to Cloud PubSub”. Though, I can’t believe that > there’s no simpler and more elegant way to solve this. > > > > -- > > Bests, > > Andor > > > > *Feladó:* Lukasz Cwik [mailto:lc...@google.com] > *Küldve:* Friday, August 24, 2018 00:10 > *Címzett:* user@beam.apache.org > *Tárgy:* Re: Window bounded to unbounded PCollection > > > > It is unclear what the purpose of windowing is since windowing doesn't > impact memory utilization much on Google Dataflow. > > > > The Direct runner uses memory proportional to the amount of data processed > but the Google Dataflow runner does not. The Google Dataflow runner's > memory usage is proportional to the size of your elements. You can reach > out to Google Cloud Platform support with some job ids or additional > questions if you want. > > > > Have you tried a pipeline like (without windowing): > > > > lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input) > > results = lines \ > | 'groupbykey' >> beam.GroupByKey()\ > | 'parse' >> > | beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows', > | 'invalid_rows', 'missing_recipients', main='main_valid') > output = results['main_valid'] \ > | 'format' >> beam.Map(output_format) \ > | 'write' >> beam.io.WriteToText(known_args.output, > file_name_suffix=".gz") > > > > > > On Thu, Aug 23, 2018 at 5:32 AM Tóth Andor < > andor.t...@centralmediacsoport.hu> wrote: > > Hello, > > I'm trying to process a few terrabytes of mail logs without using too much > memory, by windowing the bounded source into an unbounded one. > Still, the GroupByKey waits for all data to arrive. Can you give me hints > how to work this around? > > I have already searched, read available manuals and documentation, but I > still don't have a clue. > Neither Direct, nor Google Dataflow runner works. > > I'm using Python. Every item gets a timestamp, and then they are sorted to > sliding windows, by the following code: > > lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input) > > window_trigger = trigger.AfterWatermark() sliding_window = > beam.window.SlidingWindows(size=3600+600, period=3600) windowed_lines = > lines \ > | 'timestamp' >> beam.ParDo(AddTimestampDoFn())\ 'window' >> > | beam.WindowInto(sliding_window, trigger=window_trigger, > | accumulation_mode=trigger.AccumulationMode.DISCARDING) > > results = windowed_lines \ > | 'groupbykey' >> beam.GroupByKey()\ > | 'parse' >> > | beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows', > | 'invalid_rows', 'missing_recipients', main='main_valid') > > output = results['main_valid'] \ > | 'format' >> beam.Map(output_format)\ > | 'write' >> beam.io.WriteToText(known_args.output, > file_name_suffix=".gz") > > -- > Bests, > Andor > >