It may also be worth looking at how Scio implements this. The Scio version
based on Beam is here: https://github.com/spotify/scio/tree/apache-beam

And they have given some good talks.
https://www.slideshare.net/sinisalyh/scio-a-scala-api-for-google-cloud-dataflow-apache-beam
I believe the "closeAndCollect" operator in Scio is the one like Amit is
discussion.

On Mon, Feb 20, 2017 at 2:32 AM, Amit Sela <[email protected]> wrote:

> You could consider using Aggregators or Metrics (Metrics are still
> experimental and currently only supported by the Direct and Spark runner).
>
> Simply add a DoFn that reports to the Aggregator - see here
> <https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java#L108>
>  how
> to use Aggregators in DoFn.
> Then query
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L75>
> the result in the PipelineResult.
>
> Would this work for your use case ?
>
>
> On Mon, Feb 20, 2017 at 12:17 PM Antony Mayi <[email protected]> wrote:
>
>>
>> Thanks Amit,
>>
>> I fully understand the controversy of trying to collect Big data into
>> local memory... But lets say the data is result of some reduce operation so
>> driver OOM is not a problem and further processing needs to continue in the
>> driver and getting it there via Kafka is an overkill (ie the system would
>> otherwise not use Kafka at all so this would mean new dependency). I get
>> the point that I could implement all the rest on PCollection but once
>> (significant) part of the pipeline doesn't need big-data/map-reduce
>> tool-set, it would just be way easier implementing it locally.
>>
>> Antony.
>> On Monday, 20 February 2017, 10:53, Amit Sela <[email protected]>
>> wrote:
>>
>>
>> Hi Antony,
>>
>> Generally, PCollections are a distributed bag of elements, just like
>> Spark RDDs (for batch).
>> Assuming you have a distributed collection, you probably wouldn't want to
>> materialize it locally, and even if it's a global count (result) of some
>> kind (guaranteeing to avoid OOM in your "driver") you'd probably want to
>> write it to a Sink of some kind - Kafka, HDFS, etc.
>>
>> I'm curious how would you use "collect()" or materializing the
>> PCollection in the driver program ? what did you have in mind ?
>>
>> You can implement a custom Sink - Spark runner has it's own ConsoleIO to
>> print to screen using Spark's print() but I use it for dev iterations and
>> it clearly works only for the Spark runner.
>>
>> Amit
>>
>>
>> On Mon, Feb 20, 2017 at 11:40 AM Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>> Hi Antony,
>>
>> The Spark runner deals with caching/persist for you (analyzing how many
>> time the same PCollection is used).
>>
>> For the collect(), I don't fully understand your question.
>>
>> If if you want to process elements in the PCollection, you can do simple
>> ParDo:
>>
>> .apply(ParDo.of(new DoFn() {
>>    @ProcessElement
>>    public void processElements(ProcessContext context) {
>>      T element = context.element();
>>      // do whatever you want
>>    }
>> })
>>
>> Is it not what you want ?
>>
>> Regards
>> JB
>>
>> On 02/20/2017 10:30 AM, Antony Mayi wrote:
>> > Hi,
>> >
>> > what is the best way to fetch content of PCollection to local
>> > memory/process (something like calling .collect() on Spark rdd)? Do I
>> > need to implement custom Sink?
>> >
>> > Thanks for any clues,
>> > Antony.
>>
>> --
>> Jean-Baptiste Onofré
>> [email protected]
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>>
>>
>>

Reply via email to