[ https://issues.apache.org/jira/browse/BEAM-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Cwik resolved BEAM-3210. ----------------------------- Resolution: Not A Problem Assignee: Luke Cwik (was: Thomas Groh) Fix Version/s: (was: 2.1.0) Not applicable > The problem about the use of waitUntilFinish() in DirectRunner > -------------------------------------------------------------- > > Key: BEAM-3210 > URL: https://issues.apache.org/jira/browse/BEAM-3210 > Project: Beam > Issue Type: Bug > Components: runner-direct > Affects Versions: 2.1.0 > Environment: Ubuntn 14.04.3 LTS > JDK 1.8 > Beam 2.1.0 > Maven 3.5.0 > Reporter: Rick Lin > Assignee: Luke Cwik > Fix For: Not applicable > > > Dear sir, > The description of waitUntilFinish() is "waits until the pipeline finishes > and returns the final status." > In my project, a static variable is used to record a PCollection context, > where the static variable is a data list type. > For this, I considered the "p.run().waitUntilFinish()" to wait until the > pipeline finishes to avoid the loss of record in the data list. > Unfortunately, there is a problem that the data list{color:#d04437} > *sometimes* {color}may record the "null" value instead of the realistic value > In order to clearly explain, i provide my java code in the following. > {color:#14892c}"import java.io.IOException; > import java.util.ArrayList; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.Create; > import org.apache.beam.sdk.transforms.DoFn; > import org.apache.beam.sdk.transforms.Mean; > import org.apache.beam.sdk.transforms.ParDo; > import org.apache.beam.sdk.transforms.DoFn.ProcessContext; > import org.apache.beam.sdk.transforms.DoFn.ProcessElement; > public class BeamTestStatic extends Thread { > public static ArrayList<Double> myList = new ArrayList<Double>(); > public static class StaticTest extends DoFn<Double, Void> { > @ProcessElement > public void test(ProcessContext c) { > myList.add(c.element()); > } > } > public static void main(String[] args) throws IOException { > StaticTest testa=new StaticTest(); > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > PCollection<Double> data=p.apply("Rawdata", > Create.of(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,)); > PCollection<Void> listtest= data.apply(ParDo.of(testa)); > p.run().waitUntilFinish(); > System.out.println("mylist_size_a="+myList.size()); > > for (int i = 0; i < myList.size(); i++) { > System.out.println("mylist_data="+myList.get(i)); > } > "{color} > In addition, the result of my code is: > {color:#205081}"mylist_size_a=10 > mylist_data=null > mylist_data=4.0 > mylist_data=5.0 > mylist_data=9.0 > mylist_data=6.0 > mylist_data=1.0 > mylist_data=7.0 > mylist_data=8.0 > mylist_data=10.0 > mylist_data=3.0"{color} > If you have any further information, I am glad to be informed. > Thanks > Rick -- This message was sent by Atlassian JIRA (v6.4.14#64029)