Re: Not all KafkaReceivers processing the data Why?
Sure the partitions exist, but is there data in all partitions? Try the kafka offset checker: kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 -group -topic On Wed, Sep 14, 2016 at 1:00 PM, wrote: > Sure thanks I have removed dev. I do see all the partitions created > correctly at Kafka side. > > > > Topic:CEQReceiver PartitionCount:5ReplicationFactor:1 > Configs: > > Topic: CEQReceiver Partition: 0Leader: 90 Replicas: > 90Isr: 90 > > Topic: CEQReceiver Partition: 1Leader: 86 Replicas: > 86Isr: 86 > > Topic: CEQReceiver Partition: 2Leader: 87 Replicas: > 87Isr: 87 > > Topic: CEQReceiver Partition: 3Leader: 88 Replicas: > 88Isr: 88 > > Topic: CEQReceiver Partition: 4Leader: 89 Replicas: > 89Isr: 89 > > > > *From:* Jeff Nadler [mailto:jnad...@srcginc.com] > *Sent:* Wednesday, September 14, 2016 12:46 PM > *To:* Rachana Srivastava > *Cc:* user@spark.apache.org; d...@spark.apache.org > *Subject:* Re: Not all KafkaReceivers processing the data Why? > > > > Have you checked your Kafka brokers to be certain that data is going to > all 5 partitions?We use something very similar (but in Scala) and have > no problems. > > > > Also you might not get the best response blasting both user+dev lists like > this. Normally you'd want to use 'user' only. > > > > -Jeff > > > > > > On Wed, Sep 14, 2016 at 12:33 PM, Rachana Srivastava markmonitor.com> wrote: > > Hello all, > > > > I have created a Kafka topic with 5 partitions. And I am using > createStream receiver API like following. But somehow only one receiver > is getting the input data. Rest of receivers are not processign anything. > Can you please help? > > > > JavaPairDStream messages = null; > > > > if(sparkStreamCount > 0){ > > // We create an input DStream for each partition of the > topic, unify those streams, and then repartition the unified stream. > > List> kafkaStreams = new > ArrayList>(sparkStreamCount); > > for (int i = 0; i < sparkStreamCount; i++) { > > kafkaStreams.add( > KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), > contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap)); > > } > > messages = jssc.union(kafkaStreams.get(0), > kafkaStreams.subList(1, kafkaStreams.size())); > > } > > else{ > > messages = KafkaUtils.createStream(jssc, > contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), > kafkaTopicMap); > > } > > > > > > > > > > > > > > > > >
Re: Not all KafkaReceivers processing the data Why?
Take a look at how the messages are actually distributed across the partitions. If the message keys have a low cardinality, you might get poor distribution (i.e. all the messages are actually only in two of the five partitions, leading to what you see in Spark). If you take a look at the Kafka data directories, you can probably get an idea of the distribution by just examining the sizes of each partition. Jeremy On Wed, Sep 14, 2016 at 12:33 PM, Rachana Srivastava < rachana.srivast...@markmonitor.com> wrote: > Hello all, > > > > I have created a Kafka topic with 5 partitions. And I am using > createStream receiver API like following. But somehow only one receiver > is getting the input data. Rest of receivers are not processign anything. > Can you please help? > > > > JavaPairDStream messages = null; > > > > if(sparkStreamCount > 0){ > > // We create an input DStream for each partition of the > topic, unify those streams, and then repartition the unified stream. > > List> kafkaStreams = new > ArrayList>(sparkStreamCount); > > for (int i = 0; i < sparkStreamCount; i++) { > > kafkaStreams.add( > KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), > contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap)); > > } > > messages = jssc.union(kafkaStreams.get(0), > kafkaStreams.subList(1, kafkaStreams.size())); > > } > > else{ > > messages = KafkaUtils.createStream(jssc, > contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), > kafkaTopicMap); > > } > > > > > > > > > > > > > > >
Re: Not all KafkaReceivers processing the data Why?
Have you checked your Kafka brokers to be certain that data is going to all 5 partitions?We use something very similar (but in Scala) and have no problems. Also you might not get the best response blasting both user+dev lists like this. Normally you'd want to use 'user' only. -Jeff On Wed, Sep 14, 2016 at 12:33 PM, Rachana Srivastava < rachana.srivast...@markmonitor.com> wrote: > Hello all, > > > > I have created a Kafka topic with 5 partitions. And I am using > createStream receiver API like following. But somehow only one receiver > is getting the input data. Rest of receivers are not processign anything. > Can you please help? > > > > JavaPairDStream messages = null; > > > > if(sparkStreamCount > 0){ > > // We create an input DStream for each partition of the > topic, unify those streams, and then repartition the unified stream. > > List> kafkaStreams = new > ArrayList>(sparkStreamCount); > > for (int i = 0; i < sparkStreamCount; i++) { > > kafkaStreams.add( > KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), > contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap)); > > } > > messages = jssc.union(kafkaStreams.get(0), > kafkaStreams.subList(1, kafkaStreams.size())); > > } > > else{ > > messages = KafkaUtils.createStream(jssc, > contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), > kafkaTopicMap); > > } > > > > > > > > > > > > > > >
Not all KafkaReceivers processing the data Why?
Hello all, I have created a Kafka topic with 5 partitions. And I am using createStream receiver API like following. But somehow only one receiver is getting the input data. Rest of receivers are not processign anything. Can you please help? JavaPairDStream messages = null; if(sparkStreamCount > 0){ // We create an input DStream for each partition of the topic, unify those streams, and then repartition the unified stream. List> kafkaStreams = new ArrayList>(sparkStreamCount); for (int i = 0; i < sparkStreamCount; i++) { kafkaStreams.add( KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap)); } messages = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else{ messages = KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap); } [cid:image001.png@01D20E84.3558F520]