[ 
https://issues.apache.org/jira/browse/BEAM-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik resolved BEAM-3210.
-----------------------------
       Resolution: Not A Problem
         Assignee: Luke Cwik  (was: Thomas Groh)
    Fix Version/s:     (was: 2.1.0)
                   Not applicable

> The problem about the use of waitUntilFinish() in DirectRunner
> --------------------------------------------------------------
>
>                 Key: BEAM-3210
>                 URL: https://issues.apache.org/jira/browse/BEAM-3210
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>    Affects Versions: 2.1.0
>         Environment: Ubuntn 14.04.3 LTS
> JDK 1.8
> Beam 2.1.0
> Maven 3.5.0
>            Reporter: Rick Lin
>            Assignee: Luke Cwik
>             Fix For: Not applicable
>
>
> Dear sir,
> The description of waitUntilFinish() is "waits until the pipeline finishes 
> and returns the final status."
> In my project, a static variable is used to record a PCollection context, 
> where the static variable is a data list type.
> For this, I considered the "p.run().waitUntilFinish()"  to wait until the 
> pipeline finishes to avoid the loss of record in the data list.
> Unfortunately, there is a problem that the data list{color:#d04437} 
> *sometimes* {color}may record the "null" value instead of the realistic value
> In order to clearly explain, i provide my java code in the following.
> {color:#14892c}"import java.io.IOException;
> import java.util.ArrayList;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.Create;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.Mean;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
> import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
> public class BeamTestStatic extends Thread {
>   public static ArrayList<Double> myList = new ArrayList<Double>();
>   public static class StaticTest extends DoFn<Double, Void> {
>     @ProcessElement            
>       public void test(ProcessContext c) {
>                 myList.add(c.element());      
>       }               
>   }
>  public static void main(String[] args) throws IOException {                  
>       StaticTest testa=new StaticTest();
>       PipelineOptions options = PipelineOptionsFactory.create();
>       Pipeline p = Pipeline.create(options);
>   PCollection<Double> data=p.apply("Rawdata",
> Create.of(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,));
>       PCollection<Void> listtest= data.apply(ParDo.of(testa));
>   p.run().waitUntilFinish();
>   System.out.println("mylist_size_a="+myList.size());
>                
>         for (int i = 0; i < myList.size(); i++) {
>               System.out.println("mylist_data="+myList.get(i));
>         }
> "{color}
> In addition, the result of my code is:
> {color:#205081}"mylist_size_a=10
> mylist_data=null
> mylist_data=4.0
> mylist_data=5.0
> mylist_data=9.0
> mylist_data=6.0
> mylist_data=1.0
> mylist_data=7.0
> mylist_data=8.0
> mylist_data=10.0
> mylist_data=3.0"{color}
> If you have any further information, I am glad to be informed.
> Thanks
> Rick



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to