Those logs suggest that it is working, but that it has started to consume from the end of the topic.
'Read partition information from: /twitter/twitter-topic-id/partition_0 --> null' indicates that there were no consumer offsets in zookeeper, which is what you would expect the first time that it is run. Right after that, you see 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - Starting Kafka 127.0.0.1:0 from offset 185 It says that it could not find an offset stored in zookeeper and that it is starting from offset 185, which I presume is the end of the topic. This means it will not consume what is already in the topic, only new messages. I think what you want is to change it to start at the head of the topic when there is no offset available (or maybe to force from start always regardless of offset in zookeeper?) It's not clear to me which version of the kafka spout you are using, but it looks like to achieve this in 0.9.6 the fields you should be looking at are as follows: kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); kafkaConfig.forceFromStart = true; Cheers, John On Wed, Mar 30, 2016 at 2:30 AM david kavanagh <[email protected]> wrote: > Thanks for the reply! > > I added the line as you suggested but there is still no difference > unfortunately. > I am just guessing at this stage but judging by the output below it, it > seems like it is something to do with the partitioning or the offset. > The warnings start by staying that there are more tasks than partitions. > Task 1 is assigned the partition that is created in the code (highlighted > in green), then the rest of the tasks are not assigned any partitions. > Eventually is states 'Read partition information from: > /twitter/twitter-topic-id/partition_0 --> null' > > So it seems like it is not reading data from Kafka at all. I really don't > understand what is going on here. > Any ideas? > > > Kind Regards > > David > > -------------------------------------------------- > > *Storm Output:* > > Thread-9-print] INFO backtype.storm.daemon.executor - Prepared bolt > print:(2) > 32644 [Thread-11-words] INFO > org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting > 32685 [Thread-19-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32686 [Thread-19-words] WARN storm.kafka.KafkaUtils - Task [5/10] no > partitions assigned > 32686 [Thread-17-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32686 [Thread-15-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32686 [Thread-17-words] WARN storm.kafka.KafkaUtils - Task [4/10] no > partitions assigned > 32686 [Thread-15-words] WARN storm.kafka.KafkaUtils - Task [3/10] no > partitions assigned > 32686 [Thread-11-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32686 [Thread-11-words] INFO storm.kafka.KafkaUtils - Task [1/10] > assigned [Partition{host=127.0.0.1:9092, partition=0}] > 32687 [Thread-29-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32697 [Thread-19-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32697 [Thread-25-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32697 [Thread-29-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32697 [Thread-13-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32697 [Thread-27-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32697 [Thread-15-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 32689 [Thread-19-words] INFO backtype.storm.daemon.executor - Opened > spout words:(7) > 32689 [Thread-25-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32689 [Thread-15-words] INFO backtype.storm.daemon.executor - Opened > spout words:(5) > 32738 [Thread-25-words] WARN storm.kafka.KafkaUtils - Task [8/10] no > partitions assigned > 32738 [Thread-25-words] INFO backtype.storm.daemon.executor - Opened > spout words:(10) > 32689 [Thread-17-words] INFO backtype.storm.daemon.executor - Opened > spout words:(6) > 32688 [Thread-13-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32688 [Thread-21-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32739 [Thread-21-words] WARN storm.kafka.KafkaUtils - Task [6/10] no > partitions assigned > 32739 [Thread-21-words] INFO backtype.storm.daemon.executor - Opened > spout words:(8) > 32687 [Thread-27-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32740 [Thread-27-words] WARN storm.kafka.KafkaUtils - Task [9/10] no > partitions assigned > 32740 [Thread-27-words] INFO backtype.storm.daemon.executor - Opened > spout words:(11) > 32687 [Thread-23-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > 32740 [Thread-23-words] WARN storm.kafka.KafkaUtils - Task [7/10] no > partitions assigned > 32736 [Thread-29-words] WARN storm.kafka.KafkaUtils - Task [10/10] no > partitions assigned > 32742 [Thread-17-words] INFO backtype.storm.daemon.executor - Activating > spout words:(6) > 32872 [Thread-29-words] INFO backtype.storm.daemon.executor - Opened > spout words:(12) > 32742 [Thread-25-words] INFO backtype.storm.daemon.executor - Activating > spout words:(10) > 32742 [Thread-21-words] INFO backtype.storm.daemon.executor - Activating > spout words:(8) > 32742 [Thread-27-words] INFO backtype.storm.daemon.executor - Activating > spout words:(11) > 32742 [Thread-19-words] INFO backtype.storm.daemon.executor - Activating > spout words:(7) > 32741 [Thread-15-words] INFO backtype.storm.daemon.executor - Activating > spout words:(5) > 32740 [Thread-13-words] WARN storm.kafka.KafkaUtils - Task [2/10] no > partitions assigned > 32873 [Thread-29-words] INFO backtype.storm.daemon.executor - Activating > spout words:(12) > 32872 [Thread-23-words] INFO backtype.storm.daemon.executor - Opened > spout words:(9) > 32873 [Thread-13-words] INFO backtype.storm.daemon.executor - Opened > spout words:(4) > 32873 [Thread-23-words] INFO backtype.storm.daemon.executor - Activating > spout words:(9) > 32873 [Thread-13-words] INFO backtype.storm.daemon.executor - Activating > spout words:(4) > 37756 [Thread-23-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 37757 [Thread-17-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 37757 [Thread-21-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 37757 [Thread-11-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 37773 [Thread-11-words] INFO storm.kafka.PartitionManager - Read > partition information from: /twitter/twitter-topic-id/partition_0 --> null > 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - No partition > information found, using configuration to determine offset > 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - Starting > Kafka 127.0.0.1:0 from offset 185 > 37916 [Thread-11-words] INFO backtype.storm.daemon.executor - Opened > spout words:(3) > 37917 [Thread-11-words] INFO backtype.storm.daemon.executor - Activating > spout words:(3) > 62005 [Thread-11-words] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __tick, id: {}, [30] > 62013 [Thread-13-words] INFO backtype.storm.daemon.executor - Processing > received message source: __system:-1, stream: __tick, id: {}, [30] > > > ------------------------------ > Date: Wed, 30 Mar 2016 10:10:54 +0530 > Subject: Re: Storm KafkaSpout Integration > From: [email protected] > To: [email protected] > > > Hi david, > > I think everything is good but you are missing a statement > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line > config.setDebug(true); > > *Best regards,* > *K.Sai Dilip Reddy.* > > On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <[email protected]> > wrote: > > Hi all, > > I am currently trying use TestTopologyStaticHosts to try connect the > KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance > running on my localhost. I have a topic named "twitter-topic" that has some > tweets in it. This is all working as expected. I can run the consumer in > the terminal and it returns the tweets. I want to use the KafkaSpout to > connect to the Kafka topic and pull the tweets into a topology. I have > been working on this a few days now and no success. > > So far i have learned that when Storm is run in local mode that it uses an > in memory zookeeper on port 2000, which would not allow it to connect to > the Kafka topic. I have tried to get around this using the following syntax > that i found online: > > LocalCluster cluster = new LocalCluster("localhost", new Long(2181)); > > It is still not working but it seems to be connecting to Kafka as it gives > a 'closed socket connection' message when i cancel the operation (after it > does not work and hangs open). It also says in the storm output that it is > connected to localhost 2181 so it seems to be getting that far. I have > included the full output from Storm in a txt file attached. > > Here is the code i am using in the TestTopologyStaticHosts class: > > public static void main(String[] args) throws Exception { > > //String zkConnString = "localhost:2181"; > > GlobalPartitionInformation hostsAndPartitions = new > GlobalPartitionInformation(); > hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092)); > BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions); > // BrokerHosts brokerHosts = new ZkHosts(zkConnString, "/brokers"); > > SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, > "twitter-topic","/twitter","twitter-topic-id"); > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > //kafkaConfig.forceStartOffsetTime(-2); > > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("words", new KafkaSpout(kafkaConfig), 10); > builder.setBolt("print", new > PrinterBolt()).shuffleGrouping("words"); > LocalCluster cluster = new LocalCluster("localhost", new > Long(2181)); > Config config = new Config(); > config.setDebug(true); > // config.put("storm.zookeeper.servers", "localhost"); > // config.put("storm.zookeeper.port", "2181"); > cluster.submitTopology("kafka-test", config, > builder.createTopology()); > > Thread.sleep(600000); > > } > > Judging by the output it seems that there is a problem with connecting to > the Kafka partitions. > I have tried many different things to get it to work but no luck. I have > also been looking at using the KafkaSpoutTestTopology class but it is > expecting arguments including 'dockerIp' which i don't understand. > > Should i be using Storm in localmode? > Should i be using the TestTopologyStaticHosts class or would the > KafkaSpoutTestTopology > class be better? > > Any help at all would be greatly appreciated because i am really stuck. > > Kind Regards > David Kavanagh > > >
