Hi david,

I think everything is good but you are missing a statement
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line
config.setDebug(true);



*Best regards,*
*K.Sai Dilip Reddy.*

On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <[email protected]>
wrote:

> Hi all,
>
> I am currently trying use TestTopologyStaticHosts to try connect the
> KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance
> running on my localhost. I have a topic named "twitter-topic" that has some
> tweets in it. This is all working as expected. I can run the consumer in
> the terminal and it returns the tweets. I want to use the KafkaSpout to
> connect to the Kafka topic and pull the tweets into a topology. I have
> been working on this a few days now and no success.
>
> So far i have learned that when Storm is run in local mode that it uses an
> in memory zookeeper on port 2000, which would not allow it to connect to
> the Kafka topic. I have tried to get around this using the following syntax
> that i found online:
>
> LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
>
> It is still not working but it seems to be connecting to Kafka as it gives
> a 'closed socket connection' message when i cancel the operation (after it
> does not work and hangs open). It also says in the storm output that it is
> connected to localhost 2181 so it seems to be getting that far. I have
> included the full output from Storm in a txt file attached.
>
> Here is the code i am using in the TestTopologyStaticHosts class:
>
>  public static void main(String[] args) throws Exception {
>
>         //String zkConnString = "localhost:2181";
>
>         GlobalPartitionInformation hostsAndPartitions = new
> GlobalPartitionInformation();
>         hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092));
>         BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions);
>         // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers");
>
>         SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts,
> "twitter-topic","/twitter","twitter-topic-id");
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         //kafkaConfig.forceStartOffsetTime(-2);
>
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
>         builder.setBolt("print", new
> PrinterBolt()).shuffleGrouping("words");
>         LocalCluster cluster = new LocalCluster("localhost", new
> Long(2181));
>         Config config = new Config();
>         config.setDebug(true);
>         // config.put("storm.zookeeper.servers", "localhost");
>         // config.put("storm.zookeeper.port", "2181");
>         cluster.submitTopology("kafka-test", config,
> builder.createTopology());
>
>         Thread.sleep(600000);
>
>     }
>
> Judging by the output it seems that there is a problem with connecting to
> the Kafka partitions.
> I have tried many different things to get it to work but no luck. I have
> also been looking at using the KafkaSpoutTestTopology class but it is
> expecting arguments including 'dockerIp' which i don't understand.
>
> Should i be using Storm in localmode?
> Should i be using the TestTopologyStaticHosts class or would the 
> KafkaSpoutTestTopology
> class be better?
>
> Any help at all would be greatly appreciated because i am really stuck.
>
> Kind Regards
> David Kavanagh
>
>

Reply via email to