Hi,

I am trying to run a simple program that reads messages from Kafka topic using 
storm-kafka spout. Here is the snip of the simple program ->

           List<String> hosts = new ArrayList<String>();
      hosts.add("127.0.0.1:9092");//Kafka test server running on this port
      SpoutConfig kafkaConf = new 
SpoutConfig(StaticHosts.fromHostString(hosts,4), KafkaSetup.TEST_TOPIC, 
"/kafkastorm", "discovery");
               kafkaConf.forceStartOffsetTime(-2);
               kafkaConf.zkServers = ImmutableList.of("127.0.0.1");
              kafkaConf.zkPort = 2181;
          kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
           KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf);

              TopologyBuilder builder = new TopologyBuilder();
                  
          builder.setSpout("spout", kafkaSpout);
            builder.setBolt("printer", new PrinterBolt())
                     .shuffleGrouping("spout");
                         
                 LocalCluster cluster = new LocalCluster();
                        cluster.submitTopology("SimpleTopology", conf, 
builder.createTopology());

When creating the Storm-Kafka spout, the program exit with the following error 
->


309 [Thread-28-spout] INFO  backtype.storm.daemon.executor - invoke  - 
Activating spout spout:(4)
6317 [kafka-request-handler-4] WARN  kafka.server.KafkaApis - warn  - 
[KafkaApi-0] Fetch request with correlation id 0 from client  on partition 
[test-topic,3] failed due to Partition [test-topic,3] doesn't exist on 0
6329 [Thread-28-spout] ERROR storm.kafka.KafkaUtils - fetchMessages  - Received 
error while fetching messages from topic: test-topicon partition: 3
6330 [Thread-28-spout] ERROR backtype.storm.util - invoke  - Async loop died!
java.lang.NullPointerException
    at storm.kafka.PartitionManager.fill(PartitionManager.java:138)
    at storm.kafka.PartitionManager.next(PartitionManager.java:108)
    at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:117)
    at 
backtype.storm.daemon.executor$fn__1136$fn__1151$fn__1180.invoke(executor.clj:547)
    at backtype.storm.util$async_loop$fn__458.invoke(util.clj:403)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:619)

Am I doing anything wrong here? 

Thanks,
Saurabh.


PS:

here are versions of different components

storm version 0.9.0.1

zookeeper version 3.4.5

Kafka version 0.8.0-beta1

Kafka Spout version storm-kafka-0.9.0-wip16a ( with patch to work with Kafka 
0.8)


Reply via email to