Just to make sure I understand the problem: You have a specific collection of records stored in Kafka. You run your pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library transforms (such as applying Count.globally() to kafkarecords)? Additionally, I'm not sure what you mean by "executes till a record lands on method" Additionally additionally, is this reproducible if you execute with the DirectRunner? On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <[email protected]> wrote: > Hi Colleagues, > I refrained from posting this email before completing thorough testing. > I think I did. > My core code works perfect & produces the expect result every single time > without wrapping it with Beam KafkaIO to receive the data. > Without KafkaIO, it receives the records from a flat data file. I repeated > it and it always produced the right result. > With including a Beam KarkaIO and embedding exact same code in a anonymous > class running Beam pipelines, I get a different result every time I rerun > it. > Below is the snippet from where KafkaIO executes till a record lands on > method. > Kafka sends precise number of records. No duplicates. all good. > While executing in Beam, when the records are finished & I expect a > correct result, it always produces something different. > Different in different runs. > I appreciate shedding light on this issue. And thanks for your valuable > time as always. > Amir- > > public static synchronized void main(String[] args) throws Exception { > > // Create Beam Options for the Flink Runner. > FlinkPipelineOptions options = PipelineOptionsFactory.as( > FlinkPipelineOptions.class); > // Set the Streaming engine as FlinkRunner > options.setRunner(FlinkPipelineRunner.class); > // This is a Streaming process (as opposed to Batch=false) > options.setStreaming(true); > //Create the DAG pipeline for parallel processing of independent LR records > Pipeline p = Pipeline.create(options); > //Kafka broker topic is identified as "lroad" > List<String> topics = Arrays.asList("lroad"); > > PCollection<String> kafkarecords = p.apply(KafkaIO.read(). > withBootstrapServers("kafkahost:9092").withTopics(topics). > withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply( > Values.<String>create()).apply(Window.<String>into( > FixedWindows.of(Duration.standardMinutes(1))) > .triggering(AfterWatermark.pastEndOfWindow()). > withAllowedLateness(Duration.ZERO) > .accumulatingFiredPanes()); > > kafkarecords.apply(ParDo.named("ProcessLRKafkaData").of(new > DoFn<String, String>() { > > public void processElement(ProcessContext ctx) > throws Exception { > > *My core logic code here.* > })); > . > . > p.run(); // Start Beam Pipeline(s) in FlinkC Cluster > } // of main > }// of class >
