Thanks Lukasz."logging the records you see for each bundle " makes me wonder 
why you are referring to "bundle"?Sorry my confusion. 
My assumption is that I receive "one record" at a time from Kafka, and I am 
executing "one record" at a time in DoFn class object.Is there something in the 
way I am invoking KafkaIO that translates to a "bundle" rather than a "single 
record" at a time?try{ PCollection<KV<String, String>> 
kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
 .withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], 
String>, KV<String, String>>() {
Perhaps I am overlapping / repeating...somehow...Thanks again Lukasz.

      From: Lukasz Cwik <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Wednesday, July 13, 2016 12:38 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
   
Try using smaller datasets and logging the records you see for each bundle in 
your DoFns. This will help you see how your data is transitioning through the 
pipeline.
On Wed, Jul 13, 2016 at 3:30 PM, amir bahmanyari <[email protected]> wrote:

Thanks Lukasz,I receive records via Kafka to my Beam app KafkaIO.read():And 
this is how:try{ PCollection<KV<String, String>> 
kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
 .withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], 
String>, KV<String, String>>() {.....etc.
Pls let me know if I should provide further deeper details....I appreciate your 
attention. Am sure there are lessons for me to learn from "this" 
:-)Cheers+thanks so much.


      From: Lukasz Cwik <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Wednesday, July 13, 2016 12:22 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
  
Are your DoFn's idempotent and don't rely on ordering of elements?Do you use 
any triggers?
Lots of things that can non-determinism to your output, need more details about 
what your pipeline does.Using smaller input datasets can help you track down 
the source of non-determinism.


On Wed, Jul 13, 2016 at 3:09 PM, amir bahmanyari <[email protected]> wrote:

Hi Colleagues,I am getting random results for:- exact same data input- exact 
same app binary- exact same Flink cluster instancesEverything fixed, just 
repeat of running the something.Every-time, I get a different result while data 
doesn't change, code doesn't change, logic to calculate results is exact same...
Is Beam "parallelism" playing a role due to something "un-usual" in my 
code?What could the "un-usual" be in the app that may make the Beam  pipleline 
produces different results for exact same "everything"?Than+regards,Amir-



   



  

Reply via email to