Hi david, I think everything is good but you are missing a statement config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line config.setDebug(true);
*Best regards,* *K.Sai Dilip Reddy.* On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <[email protected]> wrote: > Hi all, > > I am currently trying use TestTopologyStaticHosts to try connect the > KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance > running on my localhost. I have a topic named "twitter-topic" that has some > tweets in it. This is all working as expected. I can run the consumer in > the terminal and it returns the tweets. I want to use the KafkaSpout to > connect to the Kafka topic and pull the tweets into a topology. I have > been working on this a few days now and no success. > > So far i have learned that when Storm is run in local mode that it uses an > in memory zookeeper on port 2000, which would not allow it to connect to > the Kafka topic. I have tried to get around this using the following syntax > that i found online: > > LocalCluster cluster = new LocalCluster("localhost", new Long(2181)); > > It is still not working but it seems to be connecting to Kafka as it gives > a 'closed socket connection' message when i cancel the operation (after it > does not work and hangs open). It also says in the storm output that it is > connected to localhost 2181 so it seems to be getting that far. I have > included the full output from Storm in a txt file attached. > > Here is the code i am using in the TestTopologyStaticHosts class: > > public static void main(String[] args) throws Exception { > > //String zkConnString = "localhost:2181"; > > GlobalPartitionInformation hostsAndPartitions = new > GlobalPartitionInformation(); > hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092)); > BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions); > // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers"); > > SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, > "twitter-topic","/twitter","twitter-topic-id"); > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > //kafkaConfig.forceStartOffsetTime(-2); > > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("words", new KafkaSpout(kafkaConfig), 10); > builder.setBolt("print", new > PrinterBolt()).shuffleGrouping("words"); > LocalCluster cluster = new LocalCluster("localhost", new > Long(2181)); > Config config = new Config(); > config.setDebug(true); > // config.put("storm.zookeeper.servers", "localhost"); > // config.put("storm.zookeeper.port", "2181"); > cluster.submitTopology("kafka-test", config, > builder.createTopology()); > > Thread.sleep(600000); > > } > > Judging by the output it seems that there is a problem with connecting to > the Kafka partitions. > I have tried many different things to get it to work but no luck. I have > also been looking at using the KafkaSpoutTestTopology class but it is > expecting arguments including 'dockerIp' which i don't understand. > > Should i be using Storm in localmode? > Should i be using the TestTopologyStaticHosts class or would the > KafkaSpoutTestTopology > class be better? > > Any help at all would be greatly appreciated because i am really stuck. > > Kind Regards > David Kavanagh > >
