Hey David, I would be interested in seeing what Kafka-Spouts you found online and why you found them better. Also, if you have your own Kafka-Spout opensourced in github, a link to that would be great too.
Thanks Tid On Thu, Mar 31, 2016 at 2:21 AM, david kavanagh <[email protected]> wrote: > Hi Spico, > > I changed the parallelism as you suggested but it didn't work. Yesterday > evening i gave up on using the KafkaSpout class that comes with storm. I > found some Kafka consumer java classes online and wrote my own Kafka spout > which is working fine. Thanks for the advice anyway. > > Regards > David > > ------------------------------ > Date: Wed, 30 Mar 2016 21:33:38 +0300 > Subject: Re: Storm KafkaSpout Integration > From: [email protected] > To: [email protected] > > hi, > i think the problem that you have is that you have stup one partition per > topic, but you try to conume with 10 kafka task spouts. > check this lines builder.setSpout("words", new KafkaSpout(kafkaConfig), > 10); > 10 represents the task parslellism for the spout, that shoul be in the > case of kafka the same number as the partition you have setup for kafka > topic. you use more than one kafka partition when you would like to consume > in parallel the data from the topic. please check the very good > documentation on ksfka partition on confluent site. > in my opinon, set up your hint parallelism to 1 would solve the problem. > tne max spout pending has a different meaning. > regards, > florin > > On Wednesday, March 30, 2016, david kavanagh <[email protected]> > wrote: > > I am only creating one partition in code here: > > GlobalPartitionInformation hostsAndPartitions = new > GlobalPartitionInformation(); > > hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092)); > > BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions); > > I hope that answered your question. I am new to both Storm and Kafka so > i am not sure exactly how it works. > > If i am understanding you correctly, the line you told me to add in the > first email should work because i am only creating one partition? > > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); > > Thanks again for the help :-) > > David > > ________________________________ > > Date: Wed, 30 Mar 2016 15:36:19 +0530 > > Subject: Re: Storm KafkaSpout Integration > > From: [email protected] > > To: [email protected] > > > > > > Hi david, > > > > Can I know how many partitions you are having? > > statement I have given to you is default.if you are running with no of > partitions make sure you give same number eg: if you are running with two > partitions change the number to 2 in the statement . > > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 ); > > > > Best regards, > > K.Sai Dilip Reddy. > > On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <[email protected]> > wrote: > > > > Thanks for the reply! > > I added the line as you suggested but there is still no difference > unfortunately. > > I am just guessing at this stage but judging by the output below it, it > seems like it is something to do with the partitioning or the offset. > > The warnings start by staying that there are more tasks than > partitions. > > Task 1 is assigned the partition that is created in the code > (highlighted in green), then the rest of the tasks are not assigned any > partitions. > > Eventually is states 'Read partition information from: > /twitter/twitter-topic-id/partition_0 --> null' > > So it seems like it is not reading data from Kafka at all. I really > don't understand what is going on here. > > Any ideas? > > > > Kind Regards > > David > > -------------------------------------------------- > > Storm Output: > > Thread-9-print] INFO backtype.storm.daemon.executor - Prepared bolt > print:(2) > > 32644 [Thread-11-words] INFO > org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting > > 32685 [Thread-19-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32686 [Thread-19-words] WARN storm.kafka.KafkaUtils - Task [5/10] no > partitions assigned > > 32686 [Thread-17-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32686 [Thread-15-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32686 [Thread-17-words] WARN storm.kafka.KafkaUtils - Task [4/10] no > partitions assigned > > 32686 [Thread-15-words] WARN storm.kafka.KafkaUtils - Task [3/10] no > partitions assigned > > 32686 [Thread-11-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32686 [Thread-11-words] INFO storm.kafka.KafkaUtils - Task [1/10] > assigned [Partition{host=127.0.0.1:9092, partition=0}] > > 32687 [Thread-29-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32697 [Thread-19-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32697 [Thread-25-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32697 [Thread-29-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32697 [Thread-13-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32697 [Thread-27-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32697 [Thread-15-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32689 [Thread-19-words] INFO backtype.storm.daemon.executor - Opened > spout words:(7) > > 32689 [Thread-25-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32689 [Thread-15-words] INFO backtype.storm.daemon.executor - Opened > spout words:(5) > > 32738 [Thread-25-words] WARN storm.kafka.KafkaUtils - Task [8/10] no > partitions assigned > > 32738 [Thread-25-words] INFO backtype.storm.daemon.executor - Opened > spout words:(10) > > 32689 [Thread-17-words] INFO backtype.storm.daemon.executor - Opened > spout words:(6) > > 32688 [Thread-13-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32688 [Thread-21-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32739 [Thread-21-words] WARN storm.kafka.KafkaUtils - Task [6/10] no > partitions assigned > > 32739 [Thread-21-words] INFO backtype.storm.daemon.executor - Opened > spout words:(8) > > 32687 [Thread-27-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32740 [Thread-27-words] WARN storm.kafka.KafkaUtils - Task [9/10] no > partitions assigned > > 32740 [Thread-27-words] INFO backtype.storm.daemon.executor - Opened > spout words:(11) > > 32687 [Thread-23-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32740 [Thread-23-words] WARN storm.kafka.KafkaUtils - Task [7/10] no > partitions assigned > > 32736 [Thread-29-words] WARN storm.kafka.KafkaUtils - Task [10/10] no > partitions assigned > > 32742 [Thread-17-words] INFO backtype.storm.daemon.executor - > Activating spout words:(6) > > 32872 [Thread-29-words] INFO backtype.storm.daemon.executor - Opened > spout words:(12) > > 32742 [Thread-25-words] INFO backtype.storm.daemon.executor - > Activating spout words:(10) > > 32742 [Thread-21-words] INFO backtype.storm.daemon.executor - > Activating spout words:(8) > > 32742 [Thread-27-words] INFO backtype.storm.daemon.executor - > Activating spout words:(11) > > 32742 [Thread-19-words] INFO backtype.storm.daemon.executor - > Activating spout words:(7) > > 32741 [Thread-15-words] INFO backtype.storm.daemon.executor - > Activating spout words:(5) > > 32740 [Thread-13-words] WARN storm.kafka.KafkaUtils - Task [2/10] no > partitions assigned > > 32873 [Thread-29-words] INFO backtype.storm.daemon.executor - > Activating spout words:(12) > > 32872 [Thread-23-words] INFO backtype.storm.daemon.executor - Opened > spout words:(9) > > 32873 [Thread-13-words] INFO backtype.storm.daemon.executor - Opened > spout words:(4) > > 32873 [Thread-23-words] INFO backtype.storm.daemon.executor - > Activating spout words:(9) > > 32873 [Thread-13-words] INFO backtype.storm.daemon.executor - > Activating spout words:(4) > > 37756 [Thread-23-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 37757 [Thread-17-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 37757 [Thread-21-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 37757 [Thread-11-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 37773 [Thread-11-words] INFO storm.kafka.PartitionManager - Read > partition information from: /twitter/twitter-topic-id/partition_0 --> null > > 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - No > partition information found, using configuration to determine offset > > 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - Starting > Kafka 127.0.0.1:0 from offset 185 > > 37916 [Thread-11-words] INFO backtype.storm.daemon.executor - Opened > spout words:(3) > > 37917 [Thread-11-words] INFO backtype.storm.daemon.executor - > Activating spout words:(3) > > 62005 [Thread-11-words] INFO backtype.storm.daemon.executor - > Processing received message source: __system:-1, stream: __tick, id: {}, > [30] > > 62013 [Thread-13-words] INFO backtype.storm.daemon.executor - > Processing received message source: __system:-1, stream: __tick, id: {}, > [30] > > > > ________________________________ > > Date: Wed, 30 Mar 2016 10:10:54 +0530 > > Subject: Re: Storm KafkaSpout Integration > > From: [email protected] > > To: [email protected] > > > > Hi david, > > > > I think everything is good but you are missing a statement > > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line > config.setDebug(true); > > > > Best regards, > > K.Sai Dilip Reddy. > > On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <[email protected]> > wrote: > > > > Hi all, > > I am currently trying use TestTopologyStaticHosts to try connect the > KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance > running on my localhost. I have a topic named "twitter-topic" that has some > tweets in it. This is all working as expected. I can run the consumer in > the terminal and it returns the tweets. I want to use the KafkaSpout to > connect to the Kafka topic and pull the tweets into a topology. I have been > working on this a few days now and no success. > > So far i have learned that when Storm is run in local mode that it uses > an in memory zookeeper on port 2000, which would not allow it to connect to > the Kafka topic. I have tried to get around this using the following syntax > that i found online: > > LocalCluster cluster = new LocalCluster("localhost", new Long(2181)); > > It is still not working but it seems to be connecting to Kafka as it > gives a 'closed socket connection' message when i cancel the operation > (after it does not work and hangs open). It also says in the storm output > that it is connected to localhost 2181 so it seems to be getting that far. > I have included the full output from Storm in a txt file attached. > > Here is the code i am using in the TestTopologyStaticHosts class: > > public static void main(String[] args) throws Exception { > > //String zkConnString = "localhost:2181"; > > GlobalPartitionInformation hostsAndPartitions = new > GlobalPartitionInformation(); > > hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", > 9092)); > > BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions); > > // BrokerHosts brokerHosts = new ZkHosts(zkConnString, > "/brokers"); > > SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, > "twitter-topic","/twitter","twitter-topic-id"); > > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > > //kafkaConfig.forceStartOffsetTime(-2); > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("words", new KafkaSpout(kafkaConfig), 10); > > builder.setBolt("print", new > PrinterBolt()).shuffleGrouping("words"); > > LocalCluster cluster = new LocalCluster("localhost", new > Long(2181)); > > Config config = new Config(); > > config.setDebug(true); > > // config.put("storm.zookeeper.servers", "localhost"); > > // config.put("storm.zookeeper.port", "2181"); > > cluster.submitTopology("kafka-test", config, > builder.createTopology()); > > Thread.sleep(600000); > > } > > Judging by the output it seems that there is a problem with connecting > to the Kafka partitions. > > I have tried many different things to get it to work but no luck. I have > also been looking at using the KafkaSpoutTestTopology class but it is > expecting arguments including 'dockerIp' which i don't understand. > > Should i be using Storm in localmode? > > Should i be using the TestTopologyStaticHosts class or would > the KafkaSpoutTestTopology class be better? > > Any help at all would be greatly appreciated because i am really stuck. > > Kind Regards > > David Kavanagh > > > > > > >
