After some investigation, the problem i see is liked caused by a filter and union of the dstream. if i just do kafka-stream -- process -- output operator, then there is no problem, one event will be fetched once. if i do kafka-stream -- process(1) - filter a stream A for later union --| |_ filter a stream B -- process(2) -----|_ A union B output process (3) the event will be fetched 2 times, duplicate message start process at the end of process(1), see following traces:
16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)* 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator 192.168.2.6:9092 (id: 2147483647 rack: null) for group spark-executor-testgid. log of processing (1) for event 1 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36). 1401 bytes result sent to driver 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 36) in 3494 ms on localhost (3/3) 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages 16/09/10 00:11:03 INFO DAGScheduler: running: Set() 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10, ResultStage 11) 16/09/10 00:11:03 INFO DAGScheduler: failed: Set() 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10 (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which has no missing parents 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155) 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic, partition 2 offsets 1 -> 2 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd time)*) 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job 1473491460000 ms.0 from job set of time 1473491460000 ms 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time 1473491460000 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874 s)* 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job 1473491465000 ms.0 from job set of time 1473491465000 ms 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time 1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)* and the 2nd time processing of the event finished without really doing the work. Help is hugely appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681p27687.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org