Storm-kafka Integration
Hi, I am trying to integrate storm with kafka, but did not get any proper documentation to do so. Can anyone please help me for getting started with the integration. Thanks and Regards, Komal Thombare =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: Storm-kafka Integration
Do you use trindent or only storm??? Regards, Andres El 02/06/2014, a las 10:43, Komal Thombare komal.thomb...@tcs.com escribió: Hi, I am trying to integrate storm with kafka, but did not get any proper documentation to do so. Can anyone please help me for getting started with the integration. Thanks and Regards, Komal Thombare =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: Storm-kafka Integration
Only Storm Thanks and Regards, Komal Thombare Tata Consultancy Services Limited Ph:- 086-55388772 Mail-to: komal.thomb...@tcs.com Website: http://www.tcs.com Experience certainty. IT Services Business Solutions Consulting -Andres Gomez wrote: - To: user@storm.incubator.apache.org From: Andres Gomez andresgome...@gmail.com Date: 06/02/2014 02:17PM Subject: Re: Storm-kafka Integration Do you use trindent or only storm??? Regards, Andres El 02/06/2014, a las 10:43, Komal Thombare komal.thomb...@tcs.com escribió: Hi, I am trying to integrate storm with kafka, but did not get any proper documentation to do so. Can anyone please help me for getting started with the integration. Thanks and Regards, Komal Thombare =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you
Re: Storm-kafka Integration
Hi Komal Have you looked at KafkaSpout? Thanks Deepak On Mon, Jun 2, 2014 at 2:13 PM, Komal Thombare komal.thomb...@tcs.com wrote: Hi, I am trying to integrate storm with kafka, but did not get any proper documentation to do so. Can anyone please help me for getting started with the integration. Thanks and Regards, Komal Thombare =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you -- Thanks Deepak www.bigdatabig.com www.keosha.net
Re: Storm-kafka Integration
Hi Deepak, Yes i have. I have also got the storm-contrib source code, but then I am unaware of how to compile it. Thanks and Regards, Komal Thombare Tata Consultancy Services Limited Ph:- 086-55388772 Mail-to: komal.thomb...@tcs.com Website: http://www.tcs.com Experience certainty. IT Services Business Solutions Consulting -Deepak Sharma wrote: - To: user user@storm.incubator.apache.org From: Deepak Sharma deepakmc...@gmail.com Date: 06/02/2014 02:22PM Subject: Re: Storm-kafka Integration Hi Komal Have you looked at KafkaSpout? Thanks Deepak On Mon, Jun 2, 2014 at 2:13 PM, Komal Thombare komal.thomb...@tcs.com wrote: Hi, I am trying to integrate storm with kafka, but did not get any proper documentation to do so. Can anyone please help me for getting started with the integration. Thanks and Regards, Komal Thombare =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you -- Thanks Deepak www.bigdatabig.com www.keosha.net
Re: Storm-kafka Integration
Please take a look at http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/#state-of-the-integration-game There is a github project too https://github.com/miguno/kafka-storm-starter This covers latest Storm, Kafka and Avro. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Jun 2, 2014 at 4:55 AM, Komal Thombare komal.thomb...@tcs.com wrote: Hi Deepak, Yes i have. I have also got the storm-contrib source code, but then I am unaware of how to compile it. Thanks and Regards, Komal Thombare Tata Consultancy Services Limited Ph:- 086-55388772 Mail-to: komal.thomb...@tcs.com Website: http://www.tcs.com Experience certainty. IT Services Business Solutions Consulting -Deepak Sharma wrote: - To: user user@storm.incubator.apache.org From: Deepak Sharma deepakmc...@gmail.com Date: 06/02/2014 02:22PM Subject: Re: Storm-kafka Integration Hi Komal Have you looked at KafkaSpout? Thanks Deepak On Mon, Jun 2, 2014 at 2:13 PM, Komal Thombare komal.thomb...@tcs.com wrote: Hi, I am trying to integrate storm with kafka, but did not get any proper documentation to do so. Can anyone please help me for getting started with the integration. Thanks and Regards, Komal Thombare =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you -- Thanks Deepak www.bigdatabig.com www.keosha.net
Re: Storm-kafka Integration
You should make a clone of this project: https://github.com/apache/incubator-storm/tree/master/external/storm-kafka and do “mvn install”, I suposse you use kafka 0.8.+ and then you do this: SpoutConfig spoutConfig = new SpoutConfig(); spoutConfig.zkServers = “localhost”; //zookeeper Host spoutConfig.zkPort = 2181; //zookeeper port spoutConfig.zkRoot = “/kafkaStorm/“; // zookeeper path spoutConfig.id = “storm”; // zookeeper id // This config is a example!! KafkaSpout spout = new KafkaSpout(spoutConfig); El 02/06/2014, a las 10:56, Joe Stein joe.st...@stealth.ly escribió: Please take a look at http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/#state-of-the-integration-game There is a github project too https://github.com/miguno/kafka-storm-starter This covers latest Storm, Kafka and Avro. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Mon, Jun 2, 2014 at 4:55 AM, Komal Thombare komal.thomb...@tcs.com wrote: Hi Deepak, Yes i have. I have also got the storm-contrib source code, but then I am unaware of how to compile it. Thanks and Regards, Komal Thombare Tata Consultancy Services Limited Ph:- 086-55388772 Mail-to: komal.thomb...@tcs.com Website: http://www.tcs.com Experience certainty. IT Services Business Solutions Consulting -Deepak Sharma wrote: - To: user user@storm.incubator.apache.org From: Deepak Sharma deepakmc...@gmail.com Date: 06/02/2014 02:22PM Subject: Re: Storm-kafka Integration Hi Komal Have you looked at KafkaSpout? Thanks Deepak On Mon, Jun 2, 2014 at 2:13 PM, Komal Thombare komal.thomb...@tcs.com wrote: Hi, I am trying to integrate storm with kafka, but did not get any proper documentation to do so. Can anyone please help me for getting started with the integration. Thanks and Regards, Komal Thombare =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If you are not the intended recipient, any dissemination, use, review, distribution, printing or copying of the information contained in this e-mail message and/or attachments to it are strictly prohibited. If you have received this communication in error, please notify us by reply e-mail or telephone and immediately and permanently delete the message and any attachments. Thank you -- Thanks Deepak www.bigdatabig.com www.keosha.net
Worker dies (bolt)
Hi I am using apache-storm-0.9.1-incubating. I have simple topology: Spout reads from kafka topic and Bolt writes lines from spout to HBase. recently we did a test - we send 300 000 000 messages over kafka-rest - kafka-queue - storm topology - hbase. I noticed that around one hour and around 2500 messages worker died. PID is there and process is up but bolt's execute method hangs. Bolts code is: package storm; 2 3 import java.util.Map; 4 import java.util.UUID; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.HBaseConfiguration; 8 import org.apache.hadoop.hbase.client.HTableInterface; 9 import org.apache.hadoop.hbase.client.HTablePool; 10 import org.apache.hadoop.hbase.client.Put; 11 import org.apache.hadoop.hbase.util.Bytes; 12 13 import backtype.storm.task.TopologyContext; 14 import backtype.storm.topology.BasicOutputCollector; 15 import backtype.storm.topology.OutputFieldsDeclarer; 16 import backtype.storm.topology.base.BaseBasicBolt; 17 import backtype.storm.tuple.Fields; 18 import backtype.storm.tuple.Tuple; 19 import backtype.storm.tuple.Values; public class HBaseWriterBolt extends BaseBasicBolt 22 { 23 24 HTableInterface usersTable; 25 HTablePool pool; 26 int count = 0; 27 28 @Override 29 public void prepare(Map stormConf, TopologyContext context) { 30 Configuration conf = HBaseConfiguration.create(); 31 conf.set(hbase.defaults.for.version,0.96.0.2.0.6.0-76-hadoop2); 32 conf.set(hbase.defaults.for.version.skip,true); 33 conf.set(hbase.zookeeper.quorum, vm24,vm37,vm38); 34 conf.set(hbase.zookeeper.property.clientPort, 2181); 35 conf.set(hbase.rootdir, hdfs://vm38:8020/user/hbase/data); 36 //conf.set(zookeeper.znode.parent, /hbase-unsecure); 37 38 pool = new HTablePool(conf, 1); 39 usersTable = pool.getTable(kafkademo1); 40 } 41 42 @Override 43 public void execute(Tuple tuple, BasicOutputCollector collector) 44 { 45 String line = tuple.getString(0); 46 47 Put p = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 48 p.add(Bytes.toBytes(info), Bytes.toBytes(line), Bytes.toBytes(line)); 49 50 try { 51 usersTable.put(p); 52 count ++; 53 System.out.println(Count: + count); 54 } 55 catch (Exception e){ 56 e.printStackTrace(); 57 } 58 collector.emit(new Values(line)); 59 60 } 61 62 @Override 63 public void declareOutputFields(OutputFieldsDeclarer declarer) 64 { 65 declarer.declare(new Fields(line)); 66 } 67 68 @Override 69 public void cleanup() 70 { 71 try { 72 usersTable.close(); 73 } 74 catch (Exception e){ 75 e.printStackTrace(); 76 } 77 } 78 79 } line in execute method: System.out.println(Count: + count); added in debug purpose to see in log that bolt is running. In to Spout in method nextTuple() I added debug line: System.out.println(Message from the Topic ...); After some time around 50minutes in log file I can see that Spout is working but Bolt is died. Any ideas? -- Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)
Re: Storm-kafka Integration
Dear Komal, I used the storm-kafka version mentioned in the link. http://anisnasir.wordpress.com/2014/05/25/apache-storm-kafka-a-nice-choice/ Regards Anis On Mon, Jun 2, 2014 at 1:49 PM, Komal Thombare komal.thomb...@tcs.com wrote: Hi Andres, Still i am getting the same error. Thanks and Regards, Komal Thombare -Andres Gomez wrote: - To: user@storm.incubator.apache.org From: Andres Gomez andresgome...@gmail.com Date: 06/02/2014 04:29PM Subject: Re: Storm-kafka Integration Hi again, I advise you to use the repo kafka happens to you: https://github.com/apache/incubator-storm/tree/master/external/storm-kafka 1. make a clone of this project 2. mvm package — generate a jar of project 3. mvn install — install the project on local repository Regards, Andres El 02/06/2014, a las 12:56, Komal Thombare komal.thomb...@tcs.com escribió: Hi, I have downloaded the storm-kafka jar from the below link: http://repo1.maven.org/maven2/com/n3twork/storm/storm-kafka/20140521/ Now I have made changes in the storm word count topology and used KafkaSpout. While running topology in Local mode I get following error: java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-20140521.jar:na] at backtype.storm.daemon.executor$eval5100$fn__5101$fn__5116.invoke(executor.clj:519) ~[na:na] at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431) ~[na:na] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:636) [na:1.6.0_17] Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy at java.net.URLClassLoader$1.run(URLClassLoader.java:217) ~[na:1.6.0_17] at java.security.AccessController.doPrivileged(Native Method) ~[na:1.6.0_17] at java.net.URLClassLoader.findClass(URLClassLoader.java:205) ~[na:1.6.0_17] at java.lang.ClassLoader.loadClass(ClassLoader.java:319) ~[na:1.6.0_17] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) ~[na:1.6.0_17] at java.lang.ClassLoader.loadClass(ClassLoader.java:264) ~[na:1.6.0_17] at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:332) ~[na:1.6.0_17] Though the curator-client-1.0.1.jar is present in the lib of storm and I have included same in my buildpath. Thanks and Regards, Komal Thombare -Andres Gomez wrote: - To: user@storm.incubator.apache.org From: Andres Gomez andresgome...@gmail.com Date: 06/02/2014 02:33PM Subject: Re: Storm-kafka Integration You should make a clone of this project: https://github.com/apache/incubator-storm/tree/master/external/storm-kafka and do “mvn install”, I suposse you use kafka 0.8.+ and then you do this: SpoutConfig spoutConfig = new SpoutConfig(); spoutConfig.zkServers = “localhost”; //zookeeper Host spoutConfig.zkPort = 2181; //zookeeper port spoutConfig.zkRoot = “/kafkaStorm/“; // zookeeper path spoutConfig.id = “storm”; // zookeeper id // This config is a example!! KafkaSpout spout = new KafkaSpout(spoutConfig); El 02/06/2014, a las 10:56, Joe Stein joe.st...@stealth.ly escribió: Please take a look at http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/#state-of-the-integration-game There is a github project too https://github.com/miguno/kafka-storm-starter This covers latest Storm, Kafka and Avro. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Jun 2, 2014 at 4:55 AM, Komal Thombare komal.thomb...@tcs.com wrote: Hi Deepak, Yes i have. I have also got the storm-contrib source code, but then I am unaware of how to compile it. Thanks and Regards, Komal Thombare Tata Consultancy Services Limited Ph:- 086-55388772 Mail-to: komal.thomb...@tcs.com Website: http://www.tcs.com Experience certainty. IT Services Business Solutions Consulting -Deepak Sharma wrote: - To: user user@storm.incubator.apache.org From: Deepak Sharma deepakmc...@gmail.com Date: 06/02/2014 02:22PM Subject: Re: Storm-kafka Integration Hi Komal Have you looked at KafkaSpout? Thanks Deepak On Mon, Jun 2, 2014 at 2:13 PM, Komal Thombare komal.thomb...@tcs.com wrote: Hi, I am trying to integrate storm with kafka, but did not get any proper documentation to do so. Can anyone please help me for getting started with the integration. Thanks and Regards, Komal Thombare =-=-= Notice: The information contained in this e-mail message and/or attachments to it may contain confidential or privileged information. If
Strange latency behaviour using 1 core (in a multiple-core processor)
Hi, We are doing a benchmark test and limiting the numbers of core used by Storm. The topology contains 1 spout and 6 bolts. The first bolt has the highest load. Our experiment : we run the topology on one single machine with 4 cores. We start with 1 core and then increasing to 2, 3 and 4 cores. We measure the latency with a timestamp when the spout emits the tuple and when the tuple reaches the last bolt. The result : the latency results obtained from tests with 2,3 and 4 cores are valid (as we expected - the latency results vary from 10 ms to 20 ms) however we got strange values from the tests with only 1 core that we can’t explain. The latency results vary from 4 ms to 120 ms. Does anyone have any ideas why. Thanks Baoh
Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?
The bolt base classes have a prepare method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html and the spout base classes have a similar activate method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html Is that sufficient for your needs or were you thinking of something different? Marc On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote: Hi there - I would like to set up some state that spouts and bolts share, and I'd like to prepare this state when the StormTopology gets 'activated' on a worker. it would be great if the StormTopology had something like a prepare or open method to indicate when it is starting. I looked but i could find no such API. Maybe I should submit an enhancement request ? Thanks in advance for your responses, - Chris [ if anyone is curious, the shared state is for all my application code to check or not check invariants. the invariant checking takes additional time, so we don't want to do it in production.. but during testing/development it helps catch bugs]. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
storm-kafka external project
First , there is small typo kind of error in: https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java line 217: if (lastCompletedOffset != lastCompletedOffset) { i guess there should be something like if (_committedTo != lastCompletedOffset) { without that it will never save position information. Next thing is kafka bolt. Configuration like: get topic name from global configuration is not probably best way - I may want to attach to bolts and each send to different queue. Something like SpouConfig for spout probably would work better for bolt as well (so you can pass queue name or maybe even different zk broker list if wanted). like: collector.emit(stream1,new Values(v1)); collector.emit(stream2,new Values(v2)); and I'll bind each stream to different kafka bolts to get message sent to right queue. Haralds
Re: Optimizing Kafka Stream
Thanks for the tips Chi, I'm a little confused about the partitioning. I had thought that the number of partitions was determined by the amount of parallelism in the topology. For example if I said .parallelismHint(4), then I would have 4 different partitions. Is this not the case ? Is there a set number of partitions my topology has that I need to increase in order to have higher parallelism ? Thanks On Sat, May 31, 2014 at 11:50 AM, Chi Hoang c...@groupon.com wrote: Raphael, You can try tuning your parallelism (and num workers). For Kafka 0.7, your spout parallelism could max out at: # brokers x # partitions (for the topic). If you have 4 Kafka brokers, and your topic has 5 partitions, then you could set the spout parallelism to 20 to maximize the throughput. For Kafka 0.8+, your spout parallelism could max out at # partitions for the topic, so if your topic has 5 partitions, then you would set the spout parallelism to 5. To increase parallelism, you would need to increase the number of partitions for your topic (by using the add partitions utility). As for the number of workers, setting it to 1 means that your spout will only run on a single Storm node, and would likely share resources with other Storm processes (spouts and bolts). I recommend to increase the number of workers so Storm has a chance to spread out the work, and keep a good balance. Hope this helps. Chi On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh raffihs...@gmail.com wrote: I am in the process of optimizing my stream. Currently I expect 5 000 000 tuples to come out of my spout per minute. I am trying to beef up my topology in order to process this in real time without falling behind. For some reason my batch size is capping out at 83 thousand tuples. I can't seem to make it any bigger. the processing time doesn't seem to get any smaller than 2-3 seconds either. I'm not sure how to configure the topology to get any faster / more efficient. Currently all the topology does is a groupby on time and an aggregation (Count) to aggregate everything. Here are some data points i've figured out. Batch Size:5mb num-workers: 1 parallelismHint: 2 (I'll write this a 5mb, 1, 2) 5mb, 1, 2 = 83K tuples / 6s 10mb, 1, 2 = 83k / 7s 5mb, 1, 4 = 83k / 6s 5mb, 2, 4 = 83k / 3s 5mb, 3, 6 = 83k / 3s 10mb, 3, 6 = 83k / 3s Can anybody help me figure out how to get it to process things faster ? My maxSpoutPending is at 1, but when I increased it to 2 it was the same. MessageTimeoutSec = 100 I've been following this blog: https://gist.github.com/mrflip/5958028 to an extent, not everything word for word though. I need to be able to process around 66,000 tuples per second and I'm starting to run out of ideas. Thanks -- Raphael Hsieh -- Raphael Hsieh
Writing Bolts in Python
Hi all, I am experimenting with writing bolts in Python and was wondering how the relationship between the Java and Python code works. For example, I have a Python bolt that looks like this: class ScanCountBolt(storm.BasicBolt): def __init__(self): #super(ScanCountBolt, self).__init__(script='scancount.py') self._count = defaultdict(int) def process(self, tup): product = tup.values[0] self._count[product] += 1 storm.emit([product, self._count[product]]) ScanCountBolt().run() And my corresponding Java code looks like this: public static class ScanCount extends ShellBolt implements IRichBolt { public ScanCount() { super(python, scancount.py); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(product, scans)); } @Override public MapString, Object getComponentConfiguration() { return null; } } Is that all I need to make it work or do I need to declare the data structures in the Java code as well. I am a bit confused… -Ashu
Re: Storm with RDBMS
for parallel reads of massive historical data and high volume writes you could you a distributed db with SQL layer such as Apache Hbase+Phoenix http://phoenix.incubator.apache.org/, I think it might complement Storm nicely On Mon, Jun 2, 2014 at 10:19 AM, Nathan Leung ncle...@gmail.com wrote: Something like memcached is commonly used for this scenario. Is memcached poorly suited for your goals or data access patterns? On Mon, Jun 2, 2014 at 10:06 AM, Balakrishna R balakrishn...@spanservices.com wrote: Hi, We are evaluating ‘Apache storm’ for one of the business use cases. In this use case, the incoming transactions/stream should be processed by set of rules or logic. In this process, there is a need of considering the historical data (may be 2 weeks or a month old) also. Understand that, Storm will give better performance to process the incoming transactions in real-time. What if we have to read the historical data from RDBMS and use that data in the bolts? Will this degrade the performance of whole cluster (as RDBMS systems might cause some delay due to the high load of reads from the parallelizing different bolts to achieve the better performance). Any suggestion on solving this situation? Please share. Thanks Balakrishna DISCLAIMER: This email message and all attachments are confidential and may contain information that is Privileged, Confidential or exempt from disclosure under applicable law. If you are not the intended recipient, you are notified that any dissemination, distribution or copying of this email is strictly prohibited. If you have received this email in error, please notify us immediately by return email to mailad...@spanservices.com and destroy the original message. Opinions, conclusions and other information in this message that do not relate to the official of SPAN, shall be understood to be nether given nor endorsed by SPAN.
Re: Optimizing Kafka Stream
Raphael, The number of partitions is defined in your Kafka configuration - http://kafka.apache.org/documentation.html#brokerconfigs (num.partitions) - or when you create the topic. The behavior is different for each version of Kafka, so you should read more documentation. Your topology needs to match the Kafka configuration for the topic. Chi On Mon, Jun 2, 2014 at 8:46 AM, Raphael Hsieh raffihs...@gmail.com wrote: Thanks for the tips Chi, I'm a little confused about the partitioning. I had thought that the number of partitions was determined by the amount of parallelism in the topology. For example if I said .parallelismHint(4), then I would have 4 different partitions. Is this not the case ? Is there a set number of partitions my topology has that I need to increase in order to have higher parallelism ? Thanks On Sat, May 31, 2014 at 11:50 AM, Chi Hoang c...@groupon.com wrote: Raphael, You can try tuning your parallelism (and num workers). For Kafka 0.7, your spout parallelism could max out at: # brokers x # partitions (for the topic). If you have 4 Kafka brokers, and your topic has 5 partitions, then you could set the spout parallelism to 20 to maximize the throughput. For Kafka 0.8+, your spout parallelism could max out at # partitions for the topic, so if your topic has 5 partitions, then you would set the spout parallelism to 5. To increase parallelism, you would need to increase the number of partitions for your topic (by using the add partitions utility). As for the number of workers, setting it to 1 means that your spout will only run on a single Storm node, and would likely share resources with other Storm processes (spouts and bolts). I recommend to increase the number of workers so Storm has a chance to spread out the work, and keep a good balance. Hope this helps. Chi On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh raffihs...@gmail.com wrote: I am in the process of optimizing my stream. Currently I expect 5 000 000 tuples to come out of my spout per minute. I am trying to beef up my topology in order to process this in real time without falling behind. For some reason my batch size is capping out at 83 thousand tuples. I can't seem to make it any bigger. the processing time doesn't seem to get any smaller than 2-3 seconds either. I'm not sure how to configure the topology to get any faster / more efficient. Currently all the topology does is a groupby on time and an aggregation (Count) to aggregate everything. Here are some data points i've figured out. Batch Size:5mb num-workers: 1 parallelismHint: 2 (I'll write this a 5mb, 1, 2) 5mb, 1, 2 = 83K tuples / 6s 10mb, 1, 2 = 83k / 7s 5mb, 1, 4 = 83k / 6s 5mb, 2, 4 = 83k / 3s 5mb, 3, 6 = 83k / 3s 10mb, 3, 6 = 83k / 3s Can anybody help me figure out how to get it to process things faster ? My maxSpoutPending is at 1, but when I increased it to 2 it was the same. MessageTimeoutSec = 100 I've been following this blog: https://gist.github.com/mrflip/5958028 to an extent, not everything word for word though. I need to be able to process around 66,000 tuples per second and I'm starting to run out of ideas. Thanks -- Raphael Hsieh -- Raphael Hsieh -- Data Systems Engineering data-syst...@groupon.com
Re: Optimizing Kafka Stream
Oh ok. Thanks Chi! Do you have any ideas about why my batch size never seems to get any bigger than 83K tuples ? Currently I'm just using a barebones topology that looks like this: Stream spout = topology.newStream(..., ...) .parallelismHint() .groupBy(new Fields(time)) .aggregate(new Count(), new Fields(Count)) .parallelismHint() .each(new Fields(time, count), new PrintFilter()); All the stream is doing is aggregating on like timestamps and printing out the count. in my config I've set batch size to 10mb like so: Config config = new Config(); config.(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 1024*1024*10); when I have the batch size to 5mb or even 1mb there is no difference, everything always adds up to roughly 83K tuples. in order to count up how many tuples are in the batch, I take a look at the system timestamp of when things are printed out (in the print filter) and all the print statements that have the same timestamp, I add the count values up together. On Mon, Jun 2, 2014 at 10:09 AM, Chi Hoang c...@groupon.com wrote: Raphael, The number of partitions is defined in your Kafka configuration - http://kafka.apache.org/documentation.html#brokerconfigs (num.partitions) - or when you create the topic. The behavior is different for each version of Kafka, so you should read more documentation. Your topology needs to match the Kafka configuration for the topic. Chi On Mon, Jun 2, 2014 at 8:46 AM, Raphael Hsieh raffihs...@gmail.com wrote: Thanks for the tips Chi, I'm a little confused about the partitioning. I had thought that the number of partitions was determined by the amount of parallelism in the topology. For example if I said .parallelismHint(4), then I would have 4 different partitions. Is this not the case ? Is there a set number of partitions my topology has that I need to increase in order to have higher parallelism ? Thanks On Sat, May 31, 2014 at 11:50 AM, Chi Hoang c...@groupon.com wrote: Raphael, You can try tuning your parallelism (and num workers). For Kafka 0.7, your spout parallelism could max out at: # brokers x # partitions (for the topic). If you have 4 Kafka brokers, and your topic has 5 partitions, then you could set the spout parallelism to 20 to maximize the throughput. For Kafka 0.8+, your spout parallelism could max out at # partitions for the topic, so if your topic has 5 partitions, then you would set the spout parallelism to 5. To increase parallelism, you would need to increase the number of partitions for your topic (by using the add partitions utility). As for the number of workers, setting it to 1 means that your spout will only run on a single Storm node, and would likely share resources with other Storm processes (spouts and bolts). I recommend to increase the number of workers so Storm has a chance to spread out the work, and keep a good balance. Hope this helps. Chi On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh raffihs...@gmail.com wrote: I am in the process of optimizing my stream. Currently I expect 5 000 000 tuples to come out of my spout per minute. I am trying to beef up my topology in order to process this in real time without falling behind. For some reason my batch size is capping out at 83 thousand tuples. I can't seem to make it any bigger. the processing time doesn't seem to get any smaller than 2-3 seconds either. I'm not sure how to configure the topology to get any faster / more efficient. Currently all the topology does is a groupby on time and an aggregation (Count) to aggregate everything. Here are some data points i've figured out. Batch Size:5mb num-workers: 1 parallelismHint: 2 (I'll write this a 5mb, 1, 2) 5mb, 1, 2 = 83K tuples / 6s 10mb, 1, 2 = 83k / 7s 5mb, 1, 4 = 83k / 6s 5mb, 2, 4 = 83k / 3s 5mb, 3, 6 = 83k / 3s 10mb, 3, 6 = 83k / 3s Can anybody help me figure out how to get it to process things faster ? My maxSpoutPending is at 1, but when I increased it to 2 it was the same. MessageTimeoutSec = 100 I've been following this blog: https://gist.github.com/mrflip/5958028 to an extent, not everything word for word though. I need to be able to process around 66,000 tuples per second and I'm starting to run out of ideas. Thanks -- Raphael Hsieh -- Raphael Hsieh -- Data Systems Engineering data-syst...@groupon.com -- Raphael Hsieh
Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?
Yes.. if i used prepare or open on spouts or bolts it would work, but unfortunately it would be a bit brittle. I'd have to include a spout or bolt just for initializing my invariant code... i'd rather do that when the topology is activated on the worker.. so this seems like a good use of an activated()method on the StormTopology class (where activated() would be called after the StormTopology is deserialized by the worker node process). But, if there is no such method, I will make do with what is there. thanks for your response. chris On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com wrote: The bolt base classes have a prepare method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html and the spout base classes have a similar activate method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html Is that sufficient for your needs or were you thinking of something different? Marc On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote: Hi there - I would like to set up some state that spouts and bolts share, and I'd like to prepare this state when the StormTopology gets 'activated' on a worker. it would be great if the StormTopology had something like a prepare or open method to indicate when it is starting. I looked but i could find no such API. Maybe I should submit an enhancement request ? Thanks in advance for your responses, - Chris [ if anyone is curious, the shared state is for all my application code to check or not check invariants. the invariant checking takes additional time, so we don't want to do it in production.. but during testing/development it helps catch bugs]. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?
You don't have to include a specific bolt for init code. It's not difficult to push your init code into a separate class and call it from your bolts, lock on that class, run init, and then allow other instances to skip over it. Without changing bolt/spout code, I've taken to including a task hook for init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily extendible and can be included pretty easily too: stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, Lists.newArrayList(MyTaskHook.class.getName())); Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford ch...@buildlackey.com wrote: Yes.. if i used prepare or open on spouts or bolts it would work, but unfortunately it would be a bit brittle. I'd have to include a spout or bolt just for initializing my invariant code... i'd rather do that when the topology is activated on the worker.. so this seems like a good use of an activated()method on the StormTopology class (where activated() would be called after the StormTopology is deserialized by the worker node process). But, if there is no such method, I will make do with what is there. thanks for your response. chris On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com wrote: The bolt base classes have a prepare method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html and the spout base classes have a similar activate method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html Is that sufficient for your needs or were you thinking of something different? Marc On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote: Hi there - I would like to set up some state that spouts and bolts share, and I'd like to prepare this state when the StormTopology gets 'activated' on a worker. it would be great if the StormTopology had something like a prepare or open method to indicate when it is starting. I looked but i could find no such API. Maybe I should submit an enhancement request ? Thanks in advance for your responses, - Chris [ if anyone is curious, the shared state is for all my application code to check or not check invariants. the invariant checking takes additional time, so we don't want to do it in production.. but during testing/development it helps catch bugs]. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
Re: Writing Bolts in Python
The ShellBolt looks for scancount.py in the resources/ directory in your JAR, which will be extracted to each worker machine. It then simply invokes python scancount.py in that directory. So you need to make sure the scancount.py file will be on the classpath under resources/, as well the storm.py interop library it depends upon. Based on the official word count example https://github.com/apache/incubator-storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L40-L56, your Java ShellBolt definition looks OK. The storm.py interop library that you're probably using then communicates with the rest of Storm via the Multi-lang Protocol https://storm.incubator.apache.org/documentation/Multilang-protocol.html. This means your Python process is really sending JSON messages over stdout and receiving JSON messages over stdin. That's the relationship between Python Java ( Storm) in this case. A library I'm working on with my team, streamparse https://github.com/Parsely/streamparse, makes this workflow easier by bundling upon a command-line tool for building/submitting/running Python topologies. For example, getting a Storm + Python wordcount example to run locally is just a matter of: sparse quickstart wordcount cd wordcount sparse run It also eliminates the need to write the Java glue code you're putting together here. It's still in early development but we're already using it for real Storm 0.8 and 0.9 production clusters local development. --- Andrew Montalenti Co-Founder CTO http://parse.ly On Mon, Jun 2, 2014 at 12:37 PM, Ashu Goel a...@shopkick.com wrote: Hi all, I am experimenting with writing bolts in Python and was wondering how the relationship between the Java and Python code works. For example, I have a Python bolt that looks like this: class ScanCountBolt(storm.BasicBolt): def __init__(self): #super(ScanCountBolt, self).__init__(script='scancount.py') self._count = defaultdict(int) def process(self, tup): product = tup.values[0] self._count[product] += 1 storm.emit([product, self._count[product]]) ScanCountBolt().run() And my corresponding Java code looks like this: public static class ScanCount extends ShellBolt implements IRichBolt { public ScanCount() { super(python, scancount.py); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(product, scans)); } @Override public MapString, Object getComponentConfiguration() { return null; } } Is that all I need to make it work or do I need to declare the data structures in the Java code as well. I am a bit confused... -Ashu
Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?
This looks promising. thanks. hope you don't mind one more question -- if i create my own implementation of ITaskHook and add it do the config as you illustrated in prev. msg..will the prepare() method of my implementation be called exactly once shortly after StormTopology is deserialized by the worker node process ? - chris On Mon, Jun 2, 2014 at 7:09 PM, Michael Rose mich...@fullcontact.com wrote: You don't have to include a specific bolt for init code. It's not difficult to push your init code into a separate class and call it from your bolts, lock on that class, run init, and then allow other instances to skip over it. Without changing bolt/spout code, I've taken to including a task hook for init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily extendible and can be included pretty easily too: stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, Lists.newArrayList(MyTaskHook.class.getName())); Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford ch...@buildlackey.com wrote: Yes.. if i used prepare or open on spouts or bolts it would work, but unfortunately it would be a bit brittle. I'd have to include a spout or bolt just for initializing my invariant code... i'd rather do that when the topology is activated on the worker.. so this seems like a good use of an activated()method on the StormTopology class (where activated() would be called after the StormTopology is deserialized by the worker node process). But, if there is no such method, I will make do with what is there. thanks for your response. chris On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com wrote: The bolt base classes have a prepare method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html and the spout base classes have a similar activate method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html Is that sufficient for your needs or were you thinking of something different? Marc On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote: Hi there - I would like to set up some state that spouts and bolts share, and I'd like to prepare this state when the StormTopology gets 'activated' on a worker. it would be great if the StormTopology had something like a prepare or open method to indicate when it is starting. I looked but i could find no such API. Maybe I should submit an enhancement request ? Thanks in advance for your responses, - Chris [ if anyone is curious, the shared state is for all my application code to check or not check invariants. the invariant checking takes additional time, so we don't want to do it in production.. but during testing/development it helps catch bugs]. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?
No, it will be called per bolt instance. That's why init code needs to be guarded behind a double-check lock to guarantee it only executes once per JVM. e.g. private static volatile boolean initialized = false; ... if (!initialized) { synchronized(MyInitCode.class) { if (!initialized) { // do stuff initialized = true; } } } Until there's a set of lifecycle hooks, that's about as good as I've cared to make it. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Jun 2, 2014 at 8:27 PM, Chris Bedford ch...@buildlackey.com wrote: This looks promising. thanks. hope you don't mind one more question -- if i create my own implementation of ITaskHook and add it do the config as you illustrated in prev. msg..will the prepare() method of my implementation be called exactly once shortly after StormTopology is deserialized by the worker node process ? - chris On Mon, Jun 2, 2014 at 7:09 PM, Michael Rose mich...@fullcontact.com wrote: You don't have to include a specific bolt for init code. It's not difficult to push your init code into a separate class and call it from your bolts, lock on that class, run init, and then allow other instances to skip over it. Without changing bolt/spout code, I've taken to including a task hook for init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily extendible and can be included pretty easily too: stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, Lists.newArrayList(MyTaskHook.class.getName())); Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford ch...@buildlackey.com wrote: Yes.. if i used prepare or open on spouts or bolts it would work, but unfortunately it would be a bit brittle. I'd have to include a spout or bolt just for initializing my invariant code... i'd rather do that when the topology is activated on the worker.. so this seems like a good use of an activated()method on the StormTopology class (where activated() would be called after the StormTopology is deserialized by the worker node process). But, if there is no such method, I will make do with what is there. thanks for your response. chris On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com wrote: The bolt base classes have a prepare method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html and the spout base classes have a similar activate method: https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html Is that sufficient for your needs or were you thinking of something different? Marc On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote: Hi there - I would like to set up some state that spouts and bolts share, and I'd like to prepare this state when the StormTopology gets 'activated' on a worker. it would be great if the StormTopology had something like a prepare or open method to indicate when it is starting. I looked but i could find no such API. Maybe I should submit an enhancement request ? Thanks in advance for your responses, - Chris [ if anyone is curious, the shared state is for all my application code to check or not check invariants. the invariant checking takes additional time, so we don't want to do it in production.. but during testing/development it helps catch bugs]. -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com -- Chris Bedford Founder Lead Lackey Build Lackey Labs: http://buildlackey.com Go Grails!: http://blog.buildlackey.com
Re: Topology acked/emitted count reset
Hi Andrew, From what I read in the code executor.clj (worker) is responsible for updating the stats for bolts and spouts . If a worker is restarted or it might be the case if a topology is rebalanced there is a chance of loosing the stats. Topology stats derived from spouts and bolts there is no stats kept track for topology itself. So if a worker / supervisor died and restarted on another node stats for that supervisor/workers are lost. Thanks, Harsha On Mon, Jun 2, 2014, at 07:07 PM, Andrew Montalenti wrote: Attached you'll find two screenshots from the Storm UI, one taken this morning, and one taken just recently. The Storm topology in question -- cass -- was not restarted in between. You can see the uptime is 13h (storm_ui_healthy.png) and 26h (storm_ui_num_reset.png), respectively. Yet, notice that in the later screenshot, the acked counter for the all-time window has dropped from 27.2 million to 3.9 million. All the other counts have also dropped. What explains this? Shouldn't alltime emit/ack counts for a topology that's been running 26h non-stop always be greater than the same topology 13h earlier? This is with Storm 0.9.1-incubating. --- Andrew Montalenti Co-Founder CTO [1]http://parse.ly Email had 2 attachments: * storm_ui_num_reset.png * 566k (image/png) * storm_ui_healthy.png * 541k (image/png) References 1. http://parse.ly/
Custom metrics using IMetrics interface
Hi all, I am working on a project to build an order processing pipeline on top of Storm. In order to measure performance, for every spout/bolt and every order processed by them, one requirement is to generate custom metrics in the form order id, order entry timestamp in milisec, order exit timestamp in milisec and then use a metrics consumer to collect and further process those metrics. I will greatly appreciate it if any one can share some ideas as to how to implement this requirement. My question is, getValueAndReset in IMetrics interface is a callback method. Looks to me that in order to collect the metrics using IMetrics interface, I will have to use a memory queue to temporarily store metrics generated when orders are processed in any spout and bolt and wait for getValueAndReset to be called. Any better idea? Thanks, James