Spark Streaming KafkaUtils Issue
Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following - (1) Spark Streaming starts reading from each partition right from the beginning. This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off Is my assumption not correct that the consumers (the kafka/spark connector) to start reading from the topic where it last left off...? Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off? Regards, - Abraham
Re: Spark Streaming KafkaUtils Issue
Would you mind sharing the code leading to your createStream? Are you also setting group.id? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following - (1) Spark Streaming starts reading from each partition right from the beginning. This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off Is my assumption not correct that the consumers (the kafka/spark connector) to start reading from the topic where it last left off...? Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off? Regards, - Abraham - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming KafkaUtils Issue
Sure... I do set the group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.id? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following - (1) Spark Streaming starts reading from each partition right from the beginning. This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off Is my assumption not correct that the consumers (the kafka/spark connector) to start reading from the topic where it last left off...? Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off? Regards, - Abraham -- ~
Re: Spark Streaming KafkaUtils Issue
How long do you let the consumers run for? Is it less than 60 seconds by chance? auto.commit.interval.ms defaults to 6 (60 seconds). If so that may explain why you are seeing that behavior. Cheers, Sean On Oct 10, 2014, at 4:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id/ for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id/, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.idhttp://group.id/? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following - (1) Spark Streaming starts reading from each partition right from the beginning. This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off Is my assumption not correct that the consumers (the kafka/spark connector) to start reading from the topic where it last left off...? Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off? Regards, - Abraham -- ~
RE: Spark Streaming KafkaUtils Issue
Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492). Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 6:57 AM To: Sean McNamara Cc: user@spark.apache.org Subject: Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ ·Spark’s usage of the Kafka consumer parameter auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland that discussionhttp://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.idhttp://group.id? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note
Re: Spark Streaming KafkaUtils Issue
Thanks Jerry, So, from what I can understand from the code, if I leave out auto.offset.reset, it should theoretically read from the last commit point... Correct? -abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492 ). Thanks Jerry *From:* Abraham Jacob [mailto:abe.jac...@gmail.com] *Sent:* Saturday, October 11, 2014 6:57 AM *To:* Sean McNamara *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ ·Spark’s usage of the Kafka consumer parameter auto.offset.reset http://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will *always* remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussion http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html and that discussion http://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.com wrote: Sure... I do set the group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.id? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from
Re: Spark Streaming KafkaUtils Issue
This jira and comment sums up the issue: https://issues.apache.org/jira/browse/SPARK-2492?focusedCommentId=14069708page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14069708 Basically the offset param was renamed and had slightly different semantics between kafka 0.7 than 0.8. Also it was useful because earlier versions of the spark streaming receiver could be overwhelmed when having a streaming job down for a period of time. I think this PR quite nicely addresses the issue: https://github.com/apache/spark/pull/1420 Best, Sean On Oct 10, 2014, at 6:48 PM, Abraham Jacob abe.jac...@gmail.com wrote: Thanks Jerry, So, from what I can understand from the code, if I leave out auto.offset.reset, it should theoretically read from the last commit point... Correct? -abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492). Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 6:57 AM To: Sean McNamara Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ •Spark’s usage of the Kafka consumer parameter auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland that discussionhttp://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id/ for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id/, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.idhttp://group.id/? Thanks, Sean On Oct 10, 2014
RE: Spark Streaming KafkaUtils Issue
Hi abe, You can see the details in KafkaInputDStream.scala, here is the snippet // When auto.offset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. if (kafkaParams.contains(auto.offset.reset)) { tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id)) } KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set, it will clean ZK metadata immediately, so you will always read data from beginning (set to “smallest”) and end (set to “largest”) immediately, because the ZK metadata is deleted beforehand. If you do not set this parameter, this code path will not be triggered, so data will be read from the last commit point. And if last commit point is not yet available, Kafka will move the offset to the end of partition (Kafka is set “auto.commit.offset” to “largest” by default). If you want to keep the same semantics as Kafka, you need to remove the above code path manually and recompile the Spark. Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 8:49 AM To: Shao, Saisai Cc: user@spark.apache.org; Sean McNamara Subject: Re: Spark Streaming KafkaUtils Issue Thanks Jerry, So, from what I can understand from the code, if I leave out auto.offset.reset, it should theoretically read from the last commit point... Correct? -abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492). Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 6:57 AM To: Sean McNamara Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ •Spark’s usage of the Kafka consumer parameter auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland that discussionhttp://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2
Re: Spark Streaming KafkaUtils Issue
Ah I see... much clearer now... Because auto.offset.reset will trigger KafkaReciver to delete the ZK metadata; when the control passes over to Kafka consumer API it will see that there is no offset available for the partition. This then will trigger the smallest or largest logic to execute in kafka, depending on what we set for auto.offset.reset... Thanks for explaining this clearly! Appreciate your effort. On Fri, Oct 10, 2014 at 6:08 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi abe, You can see the details in KafkaInputDStream.scala, here is the snippet // When auto.offset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. if (kafkaParams.contains(auto.offset.reset)) { tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id)) } KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set, it will clean ZK metadata immediately, so you will always read data from beginning (set to “smallest”) and end (set to “largest”) immediately, because the ZK metadata is deleted beforehand. If you do not set this parameter, this code path will not be triggered, so data will be read from the last commit point. And if last commit point is not yet available, Kafka will move the offset to the end of partition (Kafka is set “auto.commit.offset” to “largest” by default). If you want to keep the same semantics as Kafka, you need to remove the above code path manually and recompile the Spark. Thanks Jerry *From:* Abraham Jacob [mailto:abe.jac...@gmail.com] *Sent:* Saturday, October 11, 2014 8:49 AM *To:* Shao, Saisai *Cc:* user@spark.apache.org; Sean McNamara *Subject:* Re: Spark Streaming KafkaUtils Issue Thanks Jerry, So, from what I can understand from the code, if I leave out auto.offset.reset, it should theoretically read from the last commit point... Correct? -abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492 ). Thanks Jerry *From:* Abraham Jacob [mailto:abe.jac...@gmail.com] *Sent:* Saturday, October 11, 2014 6:57 AM *To:* Sean McNamara *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ ·Spark’s usage of the Kafka consumer parameter auto.offset.reset http://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will *always* remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussion http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.html and that discussion http://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.com wrote: Sure... I do set the group.id for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.id, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.ms, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.ms, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i