I have 10 partitions on a topic of Kafka, but when trying to read the
following documentation
<https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka.html> Storm
got the error below. I tried it with Storm core and with Trident, the error
is always the same.
int numPartition = 10;
Broker broker = new Broker(Configuration.BOOTSTRAP_SERVERS_CONFIG);
GlobalPartitionInformation info = new GlobalPartitionInformation(topicName);
for (int i = 0; i < numPartition; i++) {
info.addPartition(i, broker);
}
StaticHosts hosts = new StaticHosts(info);
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topicName, "/brokers",
UUID.randomUUID().toString());
/* http://goo.gl/riljni | http://goo.gl/37ZUuV */
kafkaSpoutConfig.ignoreZkOffsets = true;
kafkaSpoutConfig.startOffsetTime = -1;
kafkaSpoutConfig.scheme = new org.apache.storm.spout.SchemeAsMultiScheme(new
StringScheme());
java.nio.channels.ClosedChannelException at
kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149)
at
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) at
org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) at
org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) at
org.apache.storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:35)
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:83) at
org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604)
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) at
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)
--
Thomas Cristanis