To convert a PCollection into an ArrayList locally within your application, you would need to materialize your PCollection via some sink like AvroIO/TextIO/... and then in your program after the pipeline has completed read the output files parsing the records.
On Tue, Oct 31, 2017 at 1:29 AM, Wesley Tanaka <[email protected]> wrote: > Hi Rick, > > In the sample code you pasted, one thing to keep in mind is that different > instances of your DoFn may be processing different input elements on > different machines. There's a thread here: > > https://lists.apache.org/thread.html/cb1d1fa4db613d885e3fc5eae42d6f > 97da28615fd3a11c58a2462d17@%3Cuser.beam.apache.org%3E > > which lists two possible approaches that you might take. > > > On 10/30/2017 10:19 PM, [email protected] wrote: > > Hi all, > > > > I am Rick > > > > I would like to transform datatype from PCollection to ArrayList, and I am > not sure if it is right? > > > > My run env is following Java: 1.8 and Beam: 2.1.0 > > > > My java code is as: > > ArrayList<Integer> *myList* = *new* ArrayList<Integer>(); > > > > PipelineOptions options = PipelineOptionsFactory.*create*(); > > Pipeline p = Pipeline.*create*(options); > > PCollection<Integer> data=p.apply("data",Create.*of*(1,2,3,4,5)); > > PCollection<Integer> *newdata*=data.apply(ParDo.*of*(*new* > *DoFn<Integer,Integer>()* { > > @ProcessElement > > *public* *void* processElement(ProcessContext c) > > { > > *int* datavalue=c.element()+1; > > System.*out*.println("data="+datavalue); > > c.output(datavalue); > > } > > })); > > > > p.run(); > > > > If any idea could be shared with me, I highly appreciate it. > > > > Thanks > > > > Rick > > > -- > 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain > confidential information. Please do not use or disclose it in any way and > delete it if you are not the intended recipient. > > > -- > Wesley Tanakahttps://wtanaka.com/ > >
