Thanks Raghu...we are on the same page...but am not sure how it applies to my 
own code :(
Pipeline p = Pipeline.create(options);
............................etc...........................try { 
PCollection<KV<String, String>> kafkarecords = 
p.apply(KafkaIO.read().withBootstrapServers("kafhost:9092").withTopics(kaftopic)
 .withValueCoder(StringUtf8Coder.of()).withoutMetadata()) 
.apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], String>, KV<String, 
String>>() {...................etc......................
                                            @Override public void 
processElement(ProcessContext ctx) throws Exception 
{............................etc......................................
I do a DoFn on an anonymous class and there, each record gets executed one at a 
time. Supposedly.But the right number of outcome always exceeds the expected 
value...inconsistent and varies after each run.I have my own doubts about how 
to guarantee the "aggregation" consistency in pipelines "parallelism" ...I 
receive n number of records, that is proven fact, but mt transformation 
executes more records.Sorry for the long email...just dont know how to verify 
the pipeline consistency...Cheers      From: Raghu Angadi <[email protected]>
 To: amir bahmanyari <[email protected]> 
Cc: "[email protected]" <[email protected]>
 Sent: Tuesday, August 2, 2016 10:31 PM
 Subject: Re: Any data gets cached?
   

On Tue, Aug 2, 2016 at 4:37 PM, amir bahmanyari <[email protected]> wrote:

     .apply(new CountWords()) 

// counts same word occurence and return it as a long
                                                      
That is an aggregation. 
// Format the long result created by CountWords  object to text     
.apply(MapElements.via(new FormatAsTextFn()))

This is not. 
// TextIO to write result as text into an output-file set in options





  

Reply via email to