You could consider using Aggregators or Metrics (Metrics are still
experimental and currently only supported by the Direct and Spark runner).

Simply add a DoFn that reports to the Aggregator - see here
<https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java#L108>
how
to use Aggregators in DoFn.
Then query
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L75>
the result in the PipelineResult.

Would this work for your use case ?


On Mon, Feb 20, 2017 at 12:17 PM Antony Mayi <[email protected]> wrote:

>
> Thanks Amit,
>
> I fully understand the controversy of trying to collect Big data into
> local memory... But lets say the data is result of some reduce operation so
> driver OOM is not a problem and further processing needs to continue in the
> driver and getting it there via Kafka is an overkill (ie the system would
> otherwise not use Kafka at all so this would mean new dependency). I get
> the point that I could implement all the rest on PCollection but once
> (significant) part of the pipeline doesn't need big-data/map-reduce
> tool-set, it would just be way easier implementing it locally.
>
> Antony.
> On Monday, 20 February 2017, 10:53, Amit Sela <[email protected]>
> wrote:
>
>
> Hi Antony,
>
> Generally, PCollections are a distributed bag of elements, just like Spark
> RDDs (for batch).
> Assuming you have a distributed collection, you probably wouldn't want to
> materialize it locally, and even if it's a global count (result) of some
> kind (guaranteeing to avoid OOM in your "driver") you'd probably want to
> write it to a Sink of some kind - Kafka, HDFS, etc.
>
> I'm curious how would you use "collect()" or materializing the PCollection
> in the driver program ? what did you have in mind ?
>
> You can implement a custom Sink - Spark runner has it's own ConsoleIO to
> print to screen using Spark's print() but I use it for dev iterations and
> it clearly works only for the Spark runner.
>
> Amit
>
>
> On Mon, Feb 20, 2017 at 11:40 AM Jean-Baptiste Onofré <[email protected]>
> wrote:
>
> Hi Antony,
>
> The Spark runner deals with caching/persist for you (analyzing how many
> time the same PCollection is used).
>
> For the collect(), I don't fully understand your question.
>
> If if you want to process elements in the PCollection, you can do simple
> ParDo:
>
> .apply(ParDo.of(new DoFn() {
>    @ProcessElement
>    public void processElements(ProcessContext context) {
>      T element = context.element();
>      // do whatever you want
>    }
> })
>
> Is it not what you want ?
>
> Regards
> JB
>
> On 02/20/2017 10:30 AM, Antony Mayi wrote:
> > Hi,
> >
> > what is the best way to fetch content of PCollection to local
> > memory/process (something like calling .collect() on Spark rdd)? Do I
> > need to implement custom Sink?
> >
> > Thanks for any clues,
> > Antony.
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>

Reply via email to