Show the output of bin/kafka-topics.sh --list.  Show the actual code with
the topic name hardcoded in the set, not loaded from an external file you
didn't show.  Show the full stacktrace you're getting.

On Mon, Sep 28, 2015 at 10:03 PM, Ratika Prasad <rpra...@couponsinc.com>
wrote:

> Yes the queues are created and gets listed as well and I have posted few
> Msges also which I am able to read using Kafka-consumer.sh --from-beginning
> how spark fails with No leader offset for Set.
>
> Tried changing the offset.storage to Kafka from zookeeper.
>
> Kindly help
>
> Sent from Outlook <http://taps.io/outlookmobile>
>
> _____________________________
> From: Cody Koeninger <c...@koeninger.org>
> Sent: Tuesday, September 29, 2015 12:33 am
> Subject: Re: Spark-Kafka Connector issue
> To: Ratika Prasad <rpra...@couponsinc.com>
> Cc: <user@spark.apache.org>
>
>
>
> Did you actually create TestTopic?  See if it shows up using
> bin/kafka-topics.sh --list, and if not, create it using bin/kafka-topics.sh
> --create
>
> On Mon, Sep 28, 2015 at 1:20 PM, Ratika Prasad <rpra...@couponsinc.com>
> wrote:
>
>> Thanks for your reply.
>>
>>
>>
>> I invoked my program with the broker ip and host and it triggered as
>> expected but I see the below error
>>
>>
>>
>> ./bin/spark-submit --class
>> org.stream.processing.JavaKafkaStreamEventProcessing --master local
>> spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>> 172.28.161.32:9092 TestTopic
>>
>> 15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>>
>> 15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as
>> local[n], n > 1 in local mode if you have receivers to get data, otherwise
>> Spark jobs will not get resources to process the received data.
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.nio.channels.ClosedChannelException
>>
>> org.apache.spark.SparkException: Couldn't find leader offsets for Set
>> ([TestTopic,0])
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>>         at scala.util.Either.fold(Either.scala:97)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>>
>>         at
>> org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
>>
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> Whene I ran the below to check the offsets I get this
>>
>>
>>
>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic
>> TestTopic --group test-consumer-group --zookeeper localhost:2181
>>
>> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
>> KeeperErrorCode = NoNode for
>> /consumers/test-consumer-group/offsets/TestTopic /0.
>>
>>
>>
>> Also I just added this below configs to my
>> kafaka/config/consumer.properties and restarted kafka
>>
>>
>>
>> auto.offset.reset=smallest
>>
>> offsets.storage=zookeeper
>>
>> offsets.channel.backoff.ms=1000
>>
>> offsets.channel.socket.timeout.ms=10000
>>
>> offsets.commit.max.retries=5
>>
>> dual.commit.enabled=true
>>
>>
>>
>> *From:* Cody Koeninger [mailto:c...@koeninger.org]
>> *Sent:* Monday, September 28, 2015 7:56 PM
>> *To:* Ratika Prasad <rpra...@couponsinc.com>
>> *Cc:* d...@spark.apache.org
>> *Subject:* Re: Spark-Kafka Connector issue
>>
>>
>>
>> This is a user list question not a dev list question.
>>
>>
>>
>> Looks like your driver is having trouble communicating to the kafka
>> brokers.  Make sure the broker host and port is available from the driver
>> host (using nc or telnet); make sure that you're providing the _broker_
>> host and port to createDirectStream, not the zookeeper host; make sure the
>> topics in question actually exist on kafka and the names match what you're
>> providing to createDirectStream.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad <rpra...@couponsinc.com>
>> wrote:
>>
>> Hi All,
>>
>>
>>
>> I am trying out the spark streaming and reading the messages from kafka
>> topics which later would be created into streams as below…I have the kafka
>> setup on a vm and topics created however when I try to run the program
>> below from my spark vm as below I get an error even though the kafka server
>> and zookeeper are up and running
>>
>>
>>
>> ./bin/spark-submit --class
>> org.stream.processing.JavaKafkaStreamEventProcessing --master local
>> spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>> 172.28.161.32:2181 redemption_inbound
>>
>>
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.io.EOFException: Received -1 when reading from channel, socket has
>> likely been closed.
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>>         at scala.util.Either.fold(Either.scala:97)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>>
>>         at
>> org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
>>
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> Program
>>
>>
>>
>> *public* *static* *void* main(String[] args) {
>>
>>     *if* (args.length < 2) {
>>
>>       System.*err*.println("Usage: DirectKafkaWordCount <brokers>
>> <topics> " +
>>
>>           "  <brokers> is a list of one or more Kafka brokers " +
>>
>>           "  <topics> is a list of one or more kafka topics to consume
>> from ");
>>
>>       System.*exit*(1);
>>
>>     }
>>
>>
>>
>>     String brokers = args[0];
>>
>>     String topics = args[1];
>>
>>
>>
>>     // Create context with 2 second batch interval
>>
>>     SparkConf sparkConf = *new* SparkConf().setAppName(
>> "JavaKafkaStreamEventProcessing");
>>
>>     JavaStreamingContext jssc = *new* JavaStreamingContext(sparkConf,
>> Durations.*seconds*(2));
>>
>>
>>
>>     HashSet<String> topicsSet = *new* HashSet<String>(Arrays.*asList*
>> (topics.split(",")));
>>
>>     HashMap<String, String> kafkaParams = *new* HashMap<String,
>> String>();
>>
>>     kafkaParams.put("metadata.broker.list", brokers);
>>
>>
>>
>>     // Create direct *kafka* stream with brokers and topics
>>
>>     JavaPairInputDStream<String, String> messages = KafkaUtils.
>> *createDirectStream*(
>>
>>         jssc,
>>
>>         String.*class*,
>>
>>         String.*class*,
>>
>>         StringDecoder.*class*,
>>
>>         StringDecoder.*class*,
>>
>>         kafkaParams,
>>
>>         topicsSet
>>
>>     );
>>
>>
>>
>>     // Get the lines, split them into words, count the words and print
>>
>>     JavaDStream<String> lines = messages.map(*new* *Function<Tuple2<String,
>> String>, String>()* {
>>
>>       *public* String call(Tuple2<String, String> tuple2) {
>>
>>         *return* tuple2._2();
>>
>>       }
>>
>>     });
>>
>>     JavaDStream<String> words = lines.flatMap(*new* *FlatMapFunction<String,
>> String>()* {
>>
>>       *public* Iterable<String> call(String x) {
>>
>>         *return* Lists.*newArrayList*(*SPACE*.split(x));
>>
>>       }
>>
>>     });
>>
>>     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>>
>>       *new* *PairFunction<String, String, Integer>()* {
>>
>>         *public* Tuple2<String, Integer> call(String s) {
>>
>>           *return* *new* Tuple2<String, Integer>(s, 1);
>>
>>         }
>>
>>       }).reduceByKey(
>>
>>         *new* *Function2<Integer, Integer, Integer>()* {
>>
>>         *public* Integer call(Integer i1, Integer i2) {
>>
>>           *return* i1 + i2;
>>
>>         }
>>
>>       });
>>
>>     wordCounts.print();
>>
>>     System.*out*.println("Word Counts are : " + wordCounts.toString());
>>
>>
>>
>>     // Start the computation
>>
>>     jssc.start();
>>
>>     jssc.awaitTermination();
>>
>>   }
>>
>> }
>>
>>
>>
>
>
>
>

Reply via email to