Hi colleagues,Can someone help to solve this mystery pls?Thanks+regards,Amir-
From: amir bahmanyari <[email protected]>
To: "[email protected]" <[email protected]>
Sent: Sunday, August 7, 2016 11:44 PM
Subject: Is Beam pipeline runtime behavior inconsistent?
Hi Colleagues,I refrained from posting this email before completing thorough
testing.I think I did.My core code works perfect & produces the expect result
every single time without wrapping it with Beam KafkaIO to receive the
data.Without KafkaIO, it receives the records from a flat data file. I repeated
it and it always produced the right result.With including a Beam KarkaIO and
embedding exact same code in a anonymous class running Beam pipelines, I get a
different result every time I rerun it.Below is the snippet from where KafkaIO
executes till a record lands on method.Kafka sends precise number of records.
No duplicates. all good.While executing in Beam, when the records are finished
& I expect a correct result, it always produces something different. Different
in different runs.I appreciate shedding light on this issue. And thanks for
your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {
// Create Beam Options for the Flink Runner. FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class); // Set the Streaming
engine as FlinkRunner options.setRunner(FlinkPipelineRunner.class); // This is
a Streaming process (as opposed to Batch=false) options.setStreaming(true);
//Create the DAG pipeline for parallel processing of independent LR records
Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as
"lroad" List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords =
p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics).
withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(Values.<String>create()).apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
kafkarecords.apply(ParDo.named("ProcessLRKafkaData").of(new DoFn<String,
String>() {
public void processElement(ProcessContext ctx) throws
Exception {
My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class