Hi david, Can I know how many partitions you are having? statement I have given to you is default.if you are running with no of partitions make sure you give same number eg: if you are running with two partitions change the number to 2 in the statement . config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 );
*Best regards,* *K.Sai Dilip Reddy.* On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <[email protected]> wrote: > Thanks for the reply! > > I added the line as you suggested but there is still no difference > unfortunately. > I am just guessing at this stage but judging by the output below it, it > seems like it is something to do with the partitioning or the offset. > The warnings start by staying that there are more tasks than partitions. > Task 1 is assigned the partition that is created in the code (highlighted > in green), then the rest of the tasks are not assigned any partitions. > Eventually is states 'Read partition information from: > /twitter/twitter-topic-id/partition_0 --> null' > > So it seems like it is not reading data from Kafka at all. I really don't > understand what is going on here. > Any ideas? > > > Kind Regards > > David > > -------------------------------------------------- > > *Storm Output:* > > Thread-9-print] INFO backtype.storm.daemon.executor - Prepared bolt > print:(2) > 32644 [Thread-11-words] INFO > org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting > 32685 [Thread-19-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32686 [Thread-19-words] WARN storm.kafka.KafkaUtils - Task [5/10] no > partitions assigned > 32686 [Thread-17-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32686 [Thread-15-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32686 [Thread-17-words] WARN storm.kafka.KafkaUtils - Task [4/10] no > partitions assigned > 32686 [Thread-15-words] WARN storm.kafka.KafkaUtils - Task [3/10] no > partitions assigned > 32686 [Thread-11-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32686 [Thread-11-words] INFO storm.kafka.KafkaUtils - Task [1/10] > assigned [Partition{host=127.0.0.1:9092, partition=0}] > 32687 [Thread-29-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32697 [Thread-19-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32697 [Thread-25-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32697 [Thread-29-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32697 [Thread-13-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32697 [Thread-27-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32697 [Thread-15-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32689 [Thread-19-words] INFO backtype.storm.daemon.executor - Opened > spout words:(7) > 32689 [Thread-25-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32689 [Thread-15-words] INFO backtype.storm.daemon.executor - Opened > spout words:(5) > 32738 [Thread-25-words] WARN storm.kafka.KafkaUtils - Task [8/10] no > partitions assigned > 32738 [Thread-25-words] INFO backtype.storm.daemon.executor - Opened > spout words:(10) > 32689 [Thread-17-words] INFO backtype.storm.daemon.executor - Opened > spout words:(6) > 32688 [Thread-13-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32688 [Thread-21-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32739 [Thread-21-words] WARN storm.kafka.KafkaUtils - Task [6/10] no > partitions assigned > 32739 [Thread-21-words] INFO backtype.storm.daemon.executor - Opened > spout words:(8) > 32687 [Thread-27-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32740 [Thread-27-words] WARN storm.kafka.KafkaUtils - Task [9/10] no > partitions assigned > 32740 [Thread-27-words] INFO backtype.storm.daemon.executor - Opened > spout words:(11) > 32687 [Thread-23-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32740 [Thread-23-words] WARN storm.kafka.KafkaUtils - Task [7/10] no > partitions assigned > 32736 [Thread-29-words] WARN storm.kafka.KafkaUtils - Task [10/10] no > partitions assigned > 32742 [Thread-17-words] INFO backtype.storm.daemon.executor - Activating > spout words:(6) > 32872 [Thread-29-words] INFO backtype.storm.daemon.executor - Opened > spout words:(12) > 32742 [Thread-25-words] INFO backtype.storm.daemon.executor - Activating > spout words:(10) > 32742 [Thread-21-words] INFO backtype.storm.daemon.executor - Activating > spout words:(8) > 32742 [Thread-27-words] INFO backtype.storm.daemon.executor - Activating > spout words:(11) > 32742 [Thread-19-words] INFO backtype.storm.daemon.executor - Activating > spout words:(7) > 32741 [Thread-15-words] INFO backtype.storm.daemon.executor - Activating > spout words:(5) > 32740 [Thread-13-words] WARN storm.kafka.KafkaUtils - Task [2/10] no > partitions assigned > 32873 [Thread-29-words] INFO backtype.storm.daemon.executor - Activating > spout words:(12) > 32872 [Thread-23-words] INFO backtype.storm.daemon.executor - Opened > spout words:(9) > 32873 [Thread-13-words] INFO backtype.storm.daemon.executor - Opened > spout words:(4) > 32873 [Thread-23-words] INFO backtype.storm.daemon.executor - Activating > spout words:(9) > 32873 [Thread-13-words] INFO backtype.storm.daemon.executor - Activating > spout words:(4) > 37756 [Thread-23-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 37757 [Thread-17-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 37757 [Thread-21-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 37757 [Thread-11-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 37773 [Thread-11-words] INFO storm.kafka.PartitionManager - Read > partition information from: /twitter/twitter-topic-id/partition_0 --> null > 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - No partition > information found, using configuration to determine offset > 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - Starting > Kafka 127.0.0.1:0 from offset 185 > 37916 [Thread-11-words] INFO backtype.storm.daemon.executor - Opened > spout words:(3) > 37917 [Thread-11-words] INFO backtype.storm.daemon.executor - Activating > spout words:(3) > 62005 [Thread-11-words] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __tick, id: {}, [30] > 62013 [Thread-13-words] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __tick, id: {}, [30] > > > ------------------------------ > Date: Wed, 30 Mar 2016 10:10:54 +0530 > Subject: Re: Storm KafkaSpout Integration > From: [email protected] > To: [email protected] > > > Hi david, > > I think everything is good but you are missing a statement > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line > config.setDebug(true); > > *Best regards,* > *K.Sai Dilip Reddy.* > > On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <[email protected]> > wrote: > > Hi all, > > I am currently trying use TestTopologyStaticHosts to try connect the > KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance > running on my localhost. I have a topic named "twitter-topic" that has some > tweets in it. This is all working as expected. I can run the consumer in > the terminal and it returns the tweets. I want to use the KafkaSpout to > connect to the Kafka topic and pull the tweets into a topology. I have > been working on this a few days now and no success. > > So far i have learned that when Storm is run in local mode that it uses an > in memory zookeeper on port 2000, which would not allow it to connect to > the Kafka topic. I have tried to get around this using the following syntax > that i found online: > > LocalCluster cluster = new LocalCluster("localhost", new Long(2181)); > > It is still not working but it seems to be connecting to Kafka as it gives > a 'closed socket connection' message when i cancel the operation (after it > does not work and hangs open). It also says in the storm output that it is > connected to localhost 2181 so it seems to be getting that far. I have > included the full output from Storm in a txt file attached. > > Here is the code i am using in the TestTopologyStaticHosts class: > > public static void main(String[] args) throws Exception { > > //String zkConnString = "localhost:2181"; > > GlobalPartitionInformation hostsAndPartitions = new > GlobalPartitionInformation(); > hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092)); > BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions); > // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers"); > > SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, > "twitter-topic","/twitter","twitter-topic-id"); > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > //kafkaConfig.forceStartOffsetTime(-2); > > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("words", new KafkaSpout(kafkaConfig), 10); > builder.setBolt("print", new > PrinterBolt()).shuffleGrouping("words"); > LocalCluster cluster = new LocalCluster("localhost", new > Long(2181)); > Config config = new Config(); > config.setDebug(true); > // config.put("storm.zookeeper.servers", "localhost"); > // config.put("storm.zookeeper.port", "2181"); > cluster.submitTopology("kafka-test", config, > builder.createTopology()); > > Thread.sleep(600000); > > } > > Judging by the output it seems that there is a problem with connecting to > the Kafka partitions. > I have tried many different things to get it to work but no luck. I have > also been looking at using the KafkaSpoutTestTopology class but it is > expecting arguments including 'dockerIp' which i don't understand. > > Should i be using Storm in localmode? > Should i be using the TestTopologyStaticHosts class or would the > KafkaSpoutTestTopology > class be better? > > Any help at all would be greatly appreciated because i am really stuck. > > Kind Regards > David Kavanagh > > >
