correction. The default number kafka partition is 1 not 10.

On Fri, Feb 21, 2014 at 1:22 PM, bijoy deb <[email protected]>wrote:

> Hi,
>
> Looks like it is not able to find any topic name *test-topic*.Can you
> check and confirm if a topic named *test-topic* actually exist on Kafka.
>
> Thanks
> Bijoy
>
>
> On Fri, Feb 21, 2014 at 5:14 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEXIN) <
> [email protected]> wrote:
>
>> Hi,
>>
>> I am trying to run a simple program that reads messages from Kafka topic
>> using storm-kafka spout. Here is the snip of the simple program ->
>>
>> List<String> hosts = new ArrayList<String>();
>>  hosts.add("127.0.0.1:9092");//Kafka test server running on this port
>>  SpoutConfig kafkaConf = new
>> SpoutConfig(StaticHosts.fromHostString(hosts,4), KafkaSetup.TEST_TOPIC,
>> "/kafkastorm", "discovery");
>>  kafkaConf.forceStartOffsetTime(-2);
>> kafkaConf.zkServers = ImmutableList.of("127.0.0.1");
>>  kafkaConf.zkPort = 2181;
>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>  KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf);
>>
>>  TopologyBuilder builder = new TopologyBuilder();
>>  builder.setSpout("spout", kafkaSpout);
>> builder.setBolt("printer", new PrinterBolt())
>>  .shuffleGrouping("spout");
>>  LocalCluster cluster = new LocalCluster();
>> cluster.submitTopology("SimpleTopology", conf, builder.createTopology());
>>
>> When creating the Storm-Kafka spout, the program exit with the following
>> error ->
>>
>>
>> 309 [Thread-28-spout] INFO backtype.storm.daemon.executor - invoke -
>> Activating spout spout:(4)
>> 6317 [kafka-request-handler-4] WARN kafka.server.KafkaApis - warn -
>> [KafkaApi-0] Fetch request with correlation id 0 from client on partition
>> [test-topic,3] failed due to Partition [test-topic,3] doesn't exist on 0
>> 6329 [Thread-28-spout] ERROR storm.kafka.KafkaUtils - fetchMessages -
>> Received error while fetching messages from topic: test-topicon partition: 3
>> 6330 [Thread-28-spout] ERROR backtype.storm.util - invoke - Async loop
>> died!
>> java.lang.NullPointerException
>> at storm.kafka.PartitionManager.fill(PartitionManager.java:138)
>> at storm.kafka.PartitionManager.next(PartitionManager.java:108)
>> at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:117)
>> at
>> backtype.storm.daemon.executor$fn__1136$fn__1151$fn__1180.invoke(executor.clj:547)
>> at backtype.storm.util$async_loop$fn__458.invoke(util.clj:403)
>> at clojure.lang.AFn.run(AFn.java:24)
>> at java.lang.Thread.run(Thread.java:619)
>>
>> Am I doing anything wrong here?
>>
>> Thanks,
>> Saurabh.
>>
>>
>> PS:
>>
>> here are versions of different components
>>
>> storm version 0.9.0.1
>>
>> zookeeper version 3.4.5
>>
>> Kafka version 0.8.0-beta1
>>
>> Kafka Spout version storm-kafka-0.9.0-wip16a ( with patch to work with
>> Kafka 0.8)
>>
>>
>>
>>
>


-- 
With Regards,
Vinoth Kumar K

Reply via email to