unsubscribe 2016-04-01 10:18 GMT+01:00 david kavanagh <[email protected]>:
> Hey, > > What i found online was a simple java Kafka Consumer, and i used that code > to write my own KafkaSpout. > Here is the link to the consumer that i found: > http://wpcertification.blogspot.ie/2014/08/java-client-for-publishing-and.html > > I am using my implementation for a college assignment. Its working well > enough for me to do what i need, but i doubt very much that it would be > production quality. Here is the code anyway, it might be useful for > something. > > Regards > David > > ----------------------------------------------------------------------- > > package storm.kafka; > > import backtype.storm.Config; > import backtype.storm.metric.api.IMetric; > import backtype.storm.spout.SpoutOutputCollector; > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.topology.base.BaseRichSpout; > import kafka.message.Message; > import backtype.storm.tuple.Fields; > import backtype.storm.tuple.Values; > import backtype.storm.utils.Utils; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import storm.kafka.PartitionManager.KafkaMessageId; > > import java.util.*; > > import java.io.UnsupportedEncodingException; > import java.nio.ByteBuffer; > import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.Properties; > > import kafka.consumer.Consumer; > import kafka.consumer.ConsumerConfig; > import kafka.consumer.ConsumerIterator; > import kafka.consumer.KafkaStream; > import kafka.javaapi.consumer.ConsumerConnector; > import kafka.javaapi.message.ByteBufferMessageSet; > import kafka.message.MessageAndOffset; > > public class MyKafkaSpout extends BaseRichSpout { > > SpoutOutputCollector _collector; > KafkaStream<byte[], byte[]> _stream; > > public static class KafkaConnector extends Thread { > > final static String clientId = "KafkaTweetConsumer"; > final static String TOPIC = "twitter-topic"; > ConsumerConnector consumerConnector; > > public KafkaConnector(){ > Properties properties = new Properties(); > properties.put("zookeeper.connect","localhost:2181"); > properties.put("group.id","test-group"); > ConsumerConfig consumerConfig = new ConsumerConfig(properties); > consumerConnector = > Consumer.createJavaConsumerConnector(consumerConfig); > } > > public KafkaStream<byte[], byte[]> getStream() { > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > topicCountMap.put(TOPIC, new Integer(1)); > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumerConnector.createMessageStreams(topicCountMap); > KafkaStream<byte[], byte[]> kafkaStream = > consumerMap.get(TOPIC).get(0); > return kafkaStream; > } > } > > > > @Override > public void open(Map conf, final TopologyContext context, final > SpoutOutputCollector collector) { > _collector = collector; > > KafkaConnector kafkaConnector = new KafkaConnector(); > _stream = kafkaConnector.getStream(); > > } > > @Override > public void nextTuple() { > ConsumerIterator<byte[], byte[]> it = _stream.iterator(); > String message = new String(it.next().message()); > _collector.emit(new Values(message)); > } > > @Override > public void ack(Object id) { > } > > @Override > public void fail(Object id) { > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("tweet")); > } > } > > > > ------------------------------ > Date: Thu, 31 Mar 2016 11:12:38 -0700 > Subject: Re: Storm KafkaSpout Integration > From: [email protected] > To: [email protected] > > Hey David, > > I would be interested in seeing what Kafka-Spouts you found online and why > you found them better. > Also, if you have your own Kafka-Spout opensourced in github, a link to > that would be great too. > > Thanks > Tid > > On Thu, Mar 31, 2016 at 2:21 AM, david kavanagh <[email protected]> > wrote: > > Hi Spico, > > I changed the parallelism as you suggested but it didn't work. Yesterday > evening i gave up on using the KafkaSpout class that comes with storm. I > found some Kafka consumer java classes online and wrote my own Kafka spout > which is working fine. Thanks for the advice anyway. > > Regards > David > > ------------------------------ > Date: Wed, 30 Mar 2016 21:33:38 +0300 > Subject: Re: Storm KafkaSpout Integration > From: [email protected] > To: [email protected] > > hi, > i think the problem that you have is that you have stup one partition per > topic, but you try to conume with 10 kafka task spouts. > check this lines builder.setSpout("words", new KafkaSpout(kafkaConfig), > 10); > 10 represents the task parslellism for the spout, that shoul be in the > case of kafka the same number as the partition you have setup for kafka > topic. you use more than one kafka partition when you would like to consume > in parallel the data from the topic. please check the very good > documentation on ksfka partition on confluent site. > in my opinon, set up your hint parallelism to 1 would solve the problem. > tne max spout pending has a different meaning. > regards, > florin > > On Wednesday, March 30, 2016, david kavanagh <[email protected]> > wrote: > > I am only creating one partition in code here: > > GlobalPartitionInformation hostsAndPartitions = new > GlobalPartitionInformation(); > > hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", 9092)); > > BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions); > > I hope that answered your question. I am new to both Storm and Kafka so > i am not sure exactly how it works. > > If i am understanding you correctly, the line you told me to add in the > first email should work because i am only creating one partition? > > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); > > Thanks again for the help :-) > > David > > ________________________________ > > Date: Wed, 30 Mar 2016 15:36:19 +0530 > > Subject: Re: Storm KafkaSpout Integration > > From: [email protected] > > To: [email protected] > > > > > > Hi david, > > > > Can I know how many partitions you are having? > > statement I have given to you is default.if you are running with no of > partitions make sure you give same number eg: if you are running with two > partitions change the number to 2 in the statement . > > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 ); > > > > Best regards, > > K.Sai Dilip Reddy. > > On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <[email protected]> > wrote: > > > > Thanks for the reply! > > I added the line as you suggested but there is still no difference > unfortunately. > > I am just guessing at this stage but judging by the output below it, it > seems like it is something to do with the partitioning or the offset. > > The warnings start by staying that there are more tasks than > partitions. > > Task 1 is assigned the partition that is created in the code > (highlighted in green), then the rest of the tasks are not assigned any > partitions. > > Eventually is states 'Read partition information from: > /twitter/twitter-topic-id/partition_0 --> null' > > So it seems like it is not reading data from Kafka at all. I really > don't understand what is going on here. > > Any ideas? > > > > Kind Regards > > David > > -------------------------------------------------- > > Storm Output: > > Thread-9-print] INFO backtype.storm.daemon.executor - Prepared bolt > print:(2) > > 32644 [Thread-11-words] INFO > org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting > > 32685 [Thread-19-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32686 [Thread-19-words] WARN storm.kafka.KafkaUtils - Task [5/10] no > partitions assigned > > 32686 [Thread-17-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32686 [Thread-15-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32686 [Thread-17-words] WARN storm.kafka.KafkaUtils - Task [4/10] no > partitions assigned > > 32686 [Thread-15-words] WARN storm.kafka.KafkaUtils - Task [3/10] no > partitions assigned > > 32686 [Thread-11-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32686 [Thread-11-words] INFO storm.kafka.KafkaUtils - Task [1/10] > assigned [Partition{host=127.0.0.1:9092, partition=0}] > > 32687 [Thread-29-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32697 [Thread-19-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32697 [Thread-25-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32697 [Thread-29-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32697 [Thread-13-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32697 [Thread-27-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32697 [Thread-15-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 32689 [Thread-19-words] INFO backtype.storm.daemon.executor - Opened > spout words:(7) > > 32689 [Thread-25-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32689 [Thread-15-words] INFO backtype.storm.daemon.executor - Opened > spout words:(5) > > 32738 [Thread-25-words] WARN storm.kafka.KafkaUtils - Task [8/10] no > partitions assigned > > 32738 [Thread-25-words] INFO backtype.storm.daemon.executor - Opened > spout words:(10) > > 32689 [Thread-17-words] INFO backtype.storm.daemon.executor - Opened > spout words:(6) > > 32688 [Thread-13-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32688 [Thread-21-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32739 [Thread-21-words] WARN storm.kafka.KafkaUtils - Task [6/10] no > partitions assigned > > 32739 [Thread-21-words] INFO backtype.storm.daemon.executor - Opened > spout words:(8) > > 32687 [Thread-27-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32740 [Thread-27-words] WARN storm.kafka.KafkaUtils - Task [9/10] no > partitions assigned > > 32740 [Thread-27-words] INFO backtype.storm.daemon.executor - Opened > spout words:(11) > > 32687 [Thread-23-words] WARN storm.kafka.KafkaUtils - there are more > tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle > > 32740 [Thread-23-words] WARN storm.kafka.KafkaUtils - Task [7/10] no > partitions assigned > > 32736 [Thread-29-words] WARN storm.kafka.KafkaUtils - Task [10/10] no > partitions assigned > > 32742 [Thread-17-words] INFO backtype.storm.daemon.executor - > Activating spout words:(6) > > 32872 [Thread-29-words] INFO backtype.storm.daemon.executor - Opened > spout words:(12) > > 32742 [Thread-25-words] INFO backtype.storm.daemon.executor - > Activating spout words:(10) > > 32742 [Thread-21-words] INFO backtype.storm.daemon.executor - > Activating spout words:(8) > > 32742 [Thread-27-words] INFO backtype.storm.daemon.executor - > Activating spout words:(11) > > 32742 [Thread-19-words] INFO backtype.storm.daemon.executor - > Activating spout words:(7) > > 32741 [Thread-15-words] INFO backtype.storm.daemon.executor - > Activating spout words:(5) > > 32740 [Thread-13-words] WARN storm.kafka.KafkaUtils - Task [2/10] no > partitions assigned > > 32873 [Thread-29-words] INFO backtype.storm.daemon.executor - > Activating spout words:(12) > > 32872 [Thread-23-words] INFO backtype.storm.daemon.executor - Opened > spout words:(9) > > 32873 [Thread-13-words] INFO backtype.storm.daemon.executor - Opened > spout words:(4) > > 32873 [Thread-23-words] INFO backtype.storm.daemon.executor - > Activating spout words:(9) > > 32873 [Thread-13-words] INFO backtype.storm.daemon.executor - > Activating spout words:(4) > > 37756 [Thread-23-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 37757 [Thread-17-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 37757 [Thread-21-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 37757 [Thread-11-words-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > > 37773 [Thread-11-words] INFO storm.kafka.PartitionManager - Read > partition information from: /twitter/twitter-topic-id/partition_0 --> null > > 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - No > partition information found, using configuration to determine offset > > 37915 [Thread-11-words] INFO storm.kafka.PartitionManager - Starting > Kafka 127.0.0.1:0 from offset 185 > > 37916 [Thread-11-words] INFO backtype.storm.daemon.executor - Opened > spout words:(3) > > 37917 [Thread-11-words] INFO backtype.storm.daemon.executor - > Activating spout words:(3) > > 62005 [Thread-11-words] INFO backtype.storm.daemon.executor - > Processing received message source: __system:-1, stream: __tick, id: {}, > [30] > > 62013 [Thread-13-words] INFO backtype.storm.daemon.executor - > Processing received message source: __system:-1, stream: __tick, id: {}, > [30] > > > > ________________________________ > > Date: Wed, 30 Mar 2016 10:10:54 +0530 > > Subject: Re: Storm KafkaSpout Integration > > From: [email protected] > > To: [email protected] > > > > Hi david, > > > > I think everything is good but you are missing a statement > > config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line > config.setDebug(true); > > > > Best regards, > > K.Sai Dilip Reddy. > > On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <[email protected]> > wrote: > > > > Hi all, > > I am currently trying use TestTopologyStaticHosts to try connect the > KafkaSpout to a Kafka topic. I have a ZooKeeper and a Kafka instance > running on my localhost. I have a topic named "twitter-topic" that has some > tweets in it. This is all working as expected. I can run the consumer in > the terminal and it returns the tweets. I want to use the KafkaSpout to > connect to the Kafka topic and pull the tweets into a topology. I have been > working on this a few days now and no success. > > So far i have learned that when Storm is run in local mode that it uses > an in memory zookeeper on port 2000, which would not allow it to connect to > the Kafka topic. I have tried to get around this using the following syntax > that i found online: > > LocalCluster cluster = new LocalCluster("localhost", new Long(2181)); > > It is still not working but it seems to be connecting to Kafka as it > gives a 'closed socket connection' message when i cancel the operation > (after it does not work and hangs open). It also says in the storm output > that it is connected to localhost 2181 so it seems to be getting that far. > I have included the full output from Storm in a txt file attached. > > Here is the code i am using in the TestTopologyStaticHosts class: > > public static void main(String[] args) throws Exception { > > //String zkConnString = "localhost:2181"; > > GlobalPartitionInformation hostsAndPartitions = new > GlobalPartitionInformation(); > > hostsAndPartitions.addPartition(0, new Broker("127.0.0.1", > 9092)); > > BrokerHosts brokerHosts = new StaticHosts(hostsAndPartitions); > > // BrokerHosts brokerHosts = new ZkHosts(zkConnString, > "/brokers"); > > SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, > "twitter-topic","/twitter","twitter-topic-id"); > > kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > > //kafkaConfig.forceStartOffsetTime(-2); > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("words", new KafkaSpout(kafkaConfig), 10); > > builder.setBolt("print", new > PrinterBolt()).shuffleGrouping("words"); > > LocalCluster cluster = new LocalCluster("localhost", new > Long(2181)); > > Config config = new Config(); > > config.setDebug(true); > > // config.put("storm.zookeeper.servers", "localhost"); > > // config.put("storm.zookeeper.port", "2181"); > > cluster.submitTopology("kafka-test", config, > builder.createTopology()); > > Thread.sleep(600000); > > } > > Judging by the output it seems that there is a problem with connecting > to the Kafka partitions. > > I have tried many different things to get it to work but no luck. I have > also been looking at using the KafkaSpoutTestTopology class but it is > expecting arguments including 'dockerIp' which i don't understand. > > Should i be using Storm in localmode? > > Should i be using the TestTopologyStaticHosts class or would > the KafkaSpoutTestTopology class be better? > > Any help at all would be greatly appreciated because i am really stuck. > > Kind Regards > > David Kavanagh > > > > > > > > > -- *Akah Larry N.H* *Android Platform Engineer* *Founder IceTeck* *www.iceteck.com* Developing technologies for emergence and sustainable development.
