You could consider using Aggregators or Metrics (Metrics are still experimental and currently only supported by the Direct and Spark runner).
Simply add a DoFn that reports to the Aggregator - see here <https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java#L108> how to use Aggregators in DoFn. Then query <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L75> the result in the PipelineResult. Would this work for your use case ? On Mon, Feb 20, 2017 at 12:17 PM Antony Mayi <[email protected]> wrote: > > Thanks Amit, > > I fully understand the controversy of trying to collect Big data into > local memory... But lets say the data is result of some reduce operation so > driver OOM is not a problem and further processing needs to continue in the > driver and getting it there via Kafka is an overkill (ie the system would > otherwise not use Kafka at all so this would mean new dependency). I get > the point that I could implement all the rest on PCollection but once > (significant) part of the pipeline doesn't need big-data/map-reduce > tool-set, it would just be way easier implementing it locally. > > Antony. > On Monday, 20 February 2017, 10:53, Amit Sela <[email protected]> > wrote: > > > Hi Antony, > > Generally, PCollections are a distributed bag of elements, just like Spark > RDDs (for batch). > Assuming you have a distributed collection, you probably wouldn't want to > materialize it locally, and even if it's a global count (result) of some > kind (guaranteeing to avoid OOM in your "driver") you'd probably want to > write it to a Sink of some kind - Kafka, HDFS, etc. > > I'm curious how would you use "collect()" or materializing the PCollection > in the driver program ? what did you have in mind ? > > You can implement a custom Sink - Spark runner has it's own ConsoleIO to > print to screen using Spark's print() but I use it for dev iterations and > it clearly works only for the Spark runner. > > Amit > > > On Mon, Feb 20, 2017 at 11:40 AM Jean-Baptiste Onofré <[email protected]> > wrote: > > Hi Antony, > > The Spark runner deals with caching/persist for you (analyzing how many > time the same PCollection is used). > > For the collect(), I don't fully understand your question. > > If if you want to process elements in the PCollection, you can do simple > ParDo: > > .apply(ParDo.of(new DoFn() { > @ProcessElement > public void processElements(ProcessContext context) { > T element = context.element(); > // do whatever you want > } > }) > > Is it not what you want ? > > Regards > JB > > On 02/20/2017 10:30 AM, Antony Mayi wrote: > > Hi, > > > > what is the best way to fetch content of PCollection to local > > memory/process (something like calling .collect() on Spark rdd)? Do I > > need to implement custom Sink? > > > > Thanks for any clues, > > Antony. > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com > > > >
