Hi colleagues,Can someone help to solve this mystery pls?Thanks+regards,Amir-

      From: amir bahmanyari <[email protected]>
 To: "[email protected]" <[email protected]> 
 Sent: Sunday, August 7, 2016 11:44 PM
 Subject: Is Beam pipeline runtime behavior inconsistent?
   
Hi Colleagues,I refrained from posting this email before completing thorough 
testing.I think I did.My core code works perfect & produces the expect result 
every single time without wrapping it with Beam KafkaIO to receive the 
data.Without KafkaIO, it receives the records from a flat data file. I repeated 
it and it always produced the right result.With including a Beam KarkaIO and 
embedding exact same code in a anonymous class running Beam pipelines, I get a 
different result every time I rerun it.Below is the snippet from where KafkaIO 
executes till a record lands on method.Kafka sends precise number of records. 
No duplicates. all good.While executing in Beam, when the records are finished 
& I expect a correct result, it always produces something different. Different 
in different runs.I appreciate shedding light on this issue.  And thanks for 
your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {

// Create Beam Options for the Flink Runner. FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class); // Set the Streaming 
engine as FlinkRunner options.setRunner(FlinkPipelineRunner.class); // This is 
a Streaming process (as opposed to Batch=false) options.setStreaming(true); 
//Create the DAG pipeline for parallel processing of independent LR records 
Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as 
"lroad"  List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = 
p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics).
 
withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(Values.<String>create()).apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
           
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
     .accumulatingFiredPanes());               
kafkarecords.apply(ParDo.named("ProcessLRKafkaData").of(new DoFn<String, 
String>() {                        
                        public void processElement(ProcessContext ctx) throws 
Exception {

                                        My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class

  

Reply via email to