After some investigation, the problem i see is liked caused by a filter and
union of the dstream.
if i just do kafka-stream -- process -- output operator, then there is no
problem, one event will be fetched once.
if i do 
kafka-stream -- process(1) - filter a stream A for later union --|
                                       |_ filter a stream B  -- process(2)
-----|_ A union B output process (3)
the event will be fetched 2 times, duplicate message start process at the
end of process(1), see following traces:

16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*

16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator (id: 2147483647 rack: null) for group

log of processing (1) for event 1

16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
1401 bytes result sent to driver

16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
36) in 3494 ms on localhost (3/3)

16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool 

16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
(*processing (1)*) at finished in 3.506 s

16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages

16/09/10 00:11:03 INFO DAGScheduler: running: Set()

16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
ResultStage 11)

16/09/10 00:11:03 INFO DAGScheduler: failed: Set()

16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
(UnionRDD[41] at union (*process (3)*) at, which
has no missing parents

16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
ShuffleMapStage 10 (UnionRDD[41] at union at

16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
partition 2 offsets 1 -> 2

16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491460000 ms.0 from job set of time 1473491460000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
1473491460000 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874

16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
1473491465000 ms.0 from job set of time 1473491465000 ms

16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*

and the 2nd time processing of the event finished without really doing the

Help is hugely appreciated.

View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe e-mail:

Reply via email to