It may also be worth looking at how Scio implements this. The Scio version based on Beam is here: https://github.com/spotify/scio/tree/apache-beam
And they have given some good talks. https://www.slideshare.net/sinisalyh/scio-a-scala-api-for-google-cloud-dataflow-apache-beam I believe the "closeAndCollect" operator in Scio is the one like Amit is discussion. On Mon, Feb 20, 2017 at 2:32 AM, Amit Sela <[email protected]> wrote: > You could consider using Aggregators or Metrics (Metrics are still > experimental and currently only supported by the Direct and Spark runner). > > Simply add a DoFn that reports to the Aggregator - see here > <https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java#L108> > how > to use Aggregators in DoFn. > Then query > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L75> > the result in the PipelineResult. > > Would this work for your use case ? > > > On Mon, Feb 20, 2017 at 12:17 PM Antony Mayi <[email protected]> wrote: > >> >> Thanks Amit, >> >> I fully understand the controversy of trying to collect Big data into >> local memory... But lets say the data is result of some reduce operation so >> driver OOM is not a problem and further processing needs to continue in the >> driver and getting it there via Kafka is an overkill (ie the system would >> otherwise not use Kafka at all so this would mean new dependency). I get >> the point that I could implement all the rest on PCollection but once >> (significant) part of the pipeline doesn't need big-data/map-reduce >> tool-set, it would just be way easier implementing it locally. >> >> Antony. >> On Monday, 20 February 2017, 10:53, Amit Sela <[email protected]> >> wrote: >> >> >> Hi Antony, >> >> Generally, PCollections are a distributed bag of elements, just like >> Spark RDDs (for batch). >> Assuming you have a distributed collection, you probably wouldn't want to >> materialize it locally, and even if it's a global count (result) of some >> kind (guaranteeing to avoid OOM in your "driver") you'd probably want to >> write it to a Sink of some kind - Kafka, HDFS, etc. >> >> I'm curious how would you use "collect()" or materializing the >> PCollection in the driver program ? what did you have in mind ? >> >> You can implement a custom Sink - Spark runner has it's own ConsoleIO to >> print to screen using Spark's print() but I use it for dev iterations and >> it clearly works only for the Spark runner. >> >> Amit >> >> >> On Mon, Feb 20, 2017 at 11:40 AM Jean-Baptiste Onofré <[email protected]> >> wrote: >> >> Hi Antony, >> >> The Spark runner deals with caching/persist for you (analyzing how many >> time the same PCollection is used). >> >> For the collect(), I don't fully understand your question. >> >> If if you want to process elements in the PCollection, you can do simple >> ParDo: >> >> .apply(ParDo.of(new DoFn() { >> @ProcessElement >> public void processElements(ProcessContext context) { >> T element = context.element(); >> // do whatever you want >> } >> }) >> >> Is it not what you want ? >> >> Regards >> JB >> >> On 02/20/2017 10:30 AM, Antony Mayi wrote: >> > Hi, >> > >> > what is the best way to fetch content of PCollection to local >> > memory/process (something like calling .collect() on Spark rdd)? Do I >> > need to implement custom Sink? >> > >> > Thanks for any clues, >> > Antony. >> >> -- >> Jean-Baptiste Onofré >> [email protected] >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> >> >> >>
