Hi Rick,

In the sample code you pasted, one thing to keep in mind is that different instances of your DoFn may be processing different input elements on different machines.  There's a thread here:

https://lists.apache.org/thread.html/cb1d1fa4db613d885e3fc5eae42d6f97da28615fd3a11c58a2462d17@%3Cuser.beam.apache.org%3E

which lists two possible approaches that you might take.

On 10/30/2017 10:19 PM, linr...@itri.org.tw wrote:

Hi all,

I am Rick

I would like to transform datatype from PCollection to ArrayList, and I am not sure if it is right?

My run env is following Java: 1.8 and Beam: 2.1.0

My java code is as:

ArrayList<Integer> _myList_= *new*ArrayList<Integer>();

PipelineOptions options= PipelineOptionsFactory./create/();

Pipeline p= Pipeline./create/(options);

PCollection<Integer> data=p.apply("data",Create./of/(1,2,3,4,5));

PCollection<Integer> _newdata_=data.apply(ParDo./of/(*new*_DoFn<Integer,Integer>()_ {

@ProcessElement

*public**void*processElement(ProcessContext c)

{

*int*datavalue=c.element()+1;

    System.*/out/*.println("data="+datavalue);

c.output(datavalue);

}

}));

p.run();

If any idea could be shared with me, I highly appreciate it.

Thanks

Rick



--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
Wesley Tanaka
https://wtanaka.com/

Reply via email to