Re: spark streaming kafka connector questions
Thanks, That is what I am missing. I have added cache before action, and that 2nd processing is avoided. 2016-09-10 5:10 GMT-07:00 Cody Koeninger : > Hard to say without seeing the code, but if you do multiple actions on an > Rdd without caching, the Rdd will be computed multiple times. > > On Sep 10, 2016 2:43 AM, "Cheng Yi" wrote: > > After some investigation, the problem i see is liked caused by a filter and > union of the dstream. > if i just do kafka-stream -- process -- output operator, then there is no > problem, one event will be fetched once. > if i do > kafka-stream -- process(1) - filter a stream A for later union --| >|_ filter a stream B -- process(2) > -|_ A union B output process (3) > the event will be fetched 2 times, duplicate message start process at the > end of process(1), see following traces: > > 16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for > spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)* > > 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator > 192.168.2.6:9092 (id: 2147483647 rack: null) for group > spark-executor-testgid. > > log of processing (1) for event 1 > > 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36). > 1401 bytes result sent to driver > > 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID > 36) in 3494 ms on localhost (3/3) > > 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks > have all completed, from pool > > 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair > (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s > > 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages > > 16/09/10 00:11:03 INFO DAGScheduler: running: Set() > > 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10, > ResultStage 11) > > 16/09/10 00:11:03 INFO DAGScheduler: failed: Set() > > 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10 > (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which > has no missing parents > > 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from > ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155) > > 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic, > partition 2 offsets 1 -> 2 > > 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for > spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd > time)*) > > 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job > 147349146 ms.0 from job set of time 147349146 ms > > 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time > 147349146 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874 > s)* > > 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job > 1473491465000 ms.0 from job set of time 1473491465000 ms > > 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time > 1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)* > > and the 2nd time processing of the event finished without really doing the > work. > > Help is hugely appreciated. > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/spark-streaming-kafka-connector-questi > ons-tp27681p27687.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
Re: spark streaming kafka connector questions
Hard to say without seeing the code, but if you do multiple actions on an Rdd without caching, the Rdd will be computed multiple times. On Sep 10, 2016 2:43 AM, "Cheng Yi" wrote: After some investigation, the problem i see is liked caused by a filter and union of the dstream. if i just do kafka-stream -- process -- output operator, then there is no problem, one event will be fetched once. if i do kafka-stream -- process(1) - filter a stream A for later union --| |_ filter a stream B -- process(2) -|_ A union B output process (3) the event will be fetched 2 times, duplicate message start process at the end of process(1), see following traces: 16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)* 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator 192.168.2.6:9092 (id: 2147483647 rack: null) for group spark-executor-testgid. log of processing (1) for event 1 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36). 1401 bytes result sent to driver 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 36) in 3494 ms on localhost (3/3) 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages 16/09/10 00:11:03 INFO DAGScheduler: running: Set() 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10, ResultStage 11) 16/09/10 00:11:03 INFO DAGScheduler: failed: Set() 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10 (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which has no missing parents 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155) 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic, partition 2 offsets 1 -> 2 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd time)*) 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job 147349146 ms.0 from job set of time 147349146 ms 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time 147349146 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874 s)* 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job 1473491465000 ms.0 from job set of time 1473491465000 ms 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time 1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)* and the 2nd time processing of the event finished without really doing the work. Help is hugely appreciated. -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/spark-streaming-kafka-connector- questions-tp27681p27687.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: spark streaming kafka connector questions
After some investigation, the problem i see is liked caused by a filter and union of the dstream. if i just do kafka-stream -- process -- output operator, then there is no problem, one event will be fetched once. if i do kafka-stream -- process(1) - filter a stream A for later union --| |_ filter a stream B -- process(2) -|_ A union B output process (3) the event will be fetched 2 times, duplicate message start process at the end of process(1), see following traces: 16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)* 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator 192.168.2.6:9092 (id: 2147483647 rack: null) for group spark-executor-testgid. log of processing (1) for event 1 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36). 1401 bytes result sent to driver 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 36) in 3494 ms on localhost (3/3) 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages 16/09/10 00:11:03 INFO DAGScheduler: running: Set() 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10, ResultStage 11) 16/09/10 00:11:03 INFO DAGScheduler: failed: Set() 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10 (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which has no missing parents 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155) 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic, partition 2 offsets 1 -> 2 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd time)*) 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job 147349146 ms.0 from job set of time 147349146 ms 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time 147349146 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874 s)* 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job 1473491465000 ms.0 from job set of time 1473491465000 ms 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time 1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)* and the 2nd time processing of the event finished without really doing the work. Help is hugely appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681p27687.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: spark streaming kafka connector questions
Cody, Thanks for the message. 1. as you mentioned, I do find the version for kafka 0.10.1, I will use that, although lots of experimental tags. Thank you. 2. I have done thorough investigating, it is NOT the scenario where 1st process failed, then 2nd process triggered. 3. I do agree the session timeout, auto commit are not the root cause here. 4. the problem i see is liked caused by a filter and union of the dstream (I will try to elaborate in another question post) if i just do kafka-stream -- process -- output operator, then there is no problem if i do kafka-stream -- process(1) - filter a stream A for later union --| |_ filter a stream B -- process(2) -|_ A union B output process (3) the duplication message start process at the end of process(1), see following traces: 16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for spark-executor-testgid log-analysis-topic 2 1* (fetch EVENT 1st time)* 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator 192.168.2.6:9092 (id: 2147483647 rack: null) for group spark-executor-testgid. log of processing (1) for event 1 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36). 1401 bytes result sent to driver 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 36) in 3494 ms on localhost (3/3) 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages 16/09/10 00:11:03 INFO DAGScheduler: running: Set() 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10, ResultStage 11) 16/09/10 00:11:03 INFO DAGScheduler: failed: Set() 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10 (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which has no missing parents 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155) 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic, partition 2 offsets 1 -> 2 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for spark-executor-testgid log-analysis-topic 2 1 (* (fetch the same EVENT 2nd time)*) 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job 147349146 ms.0 from job set of time 147349146 ms 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time 147349146 ms (execution: 10.874 s) (EVENT 1st time process cost 10.874 s) 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job 1473491465000 ms.0 from job set of time 1473491465000 ms 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time 1473491465000 ms (execution: 0.066 s) (EVENT 2nd time process cost 0.066) and the 2nd time processing of the event finished without really doing the work. 2016-09-08 14:55 GMT-07:00 Cody Koeninger : > - If you're seeing repeated attempts to process the same message, you > should be able to look in the UI or logs and see that a task has > failed. Figure out why that task failed before chasing other things > > - You're not using the latest version, the latest version is for spark > 2.0. There are two versions of the connector for spark 2.0, one for > kafka 0.8 or higher, and one for kafka 0.10 or higher > > - Committing individual messages to kafka doesn't make any sense, > spark streaming deals with batches. If you're doing any aggregations > that involve shuffling, there isn't even a guarantee that you'll > process messages in order for a given topicpartition > > - Auto commit has no effect for the 0.8 version of createDirectStream. > Turning it on for the 0.10 version of createDirectStream is a really > bad idea, it will give you undefined delivery semantics, because the > commit to Kafka is unrelated to whether the batch processed > successfully > > If you're unclear on how the kafka integration works, see > > https://github.com/koeninger/kafka-exactly-once > > On Thu, Sep 8, 2016 at 4:44 PM, Cheng Yi wrote: > > I am using the lastest streaming kafka connector > > org.apache.spark > > spark-streaming-kafka_2.11 > > 1.6.2 > > > > I am facing the problem that a message is delivered two times to my > > consumers. these two deliveries are 10+ seconds apart, it looks this is > > caused by my lengthy message processing (took about 60 seconds), then I > > tried to solve this, but I am still stuck. > > > > 1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9 > > but not v.10 > > > > JavaPairInputDStream ds = KafkaUtils.createDirectStream( > jsc, > > String.class, String.class, > StringDecoder.class, StringDecoder.class, > > kafkaParams, topicsSet); > > > > 2. after i got the message from the kafka streaming via consumer, how
Re: spark streaming kafka connector questions
- If you're seeing repeated attempts to process the same message, you should be able to look in the UI or logs and see that a task has failed. Figure out why that task failed before chasing other things - You're not using the latest version, the latest version is for spark 2.0. There are two versions of the connector for spark 2.0, one for kafka 0.8 or higher, and one for kafka 0.10 or higher - Committing individual messages to kafka doesn't make any sense, spark streaming deals with batches. If you're doing any aggregations that involve shuffling, there isn't even a guarantee that you'll process messages in order for a given topicpartition - Auto commit has no effect for the 0.8 version of createDirectStream. Turning it on for the 0.10 version of createDirectStream is a really bad idea, it will give you undefined delivery semantics, because the commit to Kafka is unrelated to whether the batch processed successfully If you're unclear on how the kafka integration works, see https://github.com/koeninger/kafka-exactly-once On Thu, Sep 8, 2016 at 4:44 PM, Cheng Yi wrote: > I am using the lastest streaming kafka connector > org.apache.spark > spark-streaming-kafka_2.11 > 1.6.2 > > I am facing the problem that a message is delivered two times to my > consumers. these two deliveries are 10+ seconds apart, it looks this is > caused by my lengthy message processing (took about 60 seconds), then I > tried to solve this, but I am still stuck. > > 1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9 > but not v.10 > > JavaPairInputDStream ds = KafkaUtils.createDirectStream(jsc, > String.class, String.class, > StringDecoder.class, StringDecoder.class, > kafkaParams, topicsSet); > > 2. after i got the message from the kafka streaming via consumer, how can I > commit the message without finish the whole processing (the whole processing > might take minutes), it looks I can't get the consumer from the KafkaUtils > to execute the kafka commit API. > > 3. If I can't do the manual commit, then I need to tell Kafka Consumer to > allow longer session or auto commit, for v0.8 or v0.9, I have tried to pass > following properties to KafkaUtils > > kafkaParams.put("auto.commit.enable", "true"); > kafkaParams.put("auto.commit.interval.ms", "1000"); > kafkaParams.put("zookeeper.session.timeout.ms", "6"); > kafkaParams.put("zookeeper.connection.timeout.ms", "6"); > > Still not working. > Help is greatly appreciated ! > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org