Storm-kafka Integration

2014-06-02 Thread Komal Thombare
 Hi,

I am trying to integrate storm with kafka, but did not get any proper 
documentation to do so.
Can anyone please help me for getting started with the integration.

Thanks and Regards,

Komal Thombare

=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




Re: Storm-kafka Integration

2014-06-02 Thread Andres Gomez
Do you use trindent or only storm???

Regards,

Andres
El 02/06/2014, a las 10:43, Komal Thombare komal.thomb...@tcs.com escribió:

 Hi,
 
 I am trying to integrate storm with kafka, but did not get any proper 
 documentation to do so.
 Can anyone please help me for getting started with the integration.
 
 Thanks and Regards,
 
 Komal Thombare
 
 =-=-=
 Notice: The information contained in this e-mail
 message and/or attachments to it may contain 
 confidential or privileged information. If you are 
 not the intended recipient, any dissemination, use, 
 review, distribution, printing or copying of the 
 information contained in this e-mail message 
 and/or attachments to it are strictly prohibited. If 
 you have received this communication in error, 
 please notify us by reply e-mail or telephone and 
 immediately and permanently delete the message 
 and any attachments. Thank you
 
 



Re: Storm-kafka Integration

2014-06-02 Thread Komal Thombare
 Only Storm

Thanks and Regards,

Komal Thombare
Tata Consultancy Services Limited
Ph:- 086-55388772
Mail-to: komal.thomb...@tcs.com
Website: http://www.tcs.com

Experience certainty. IT Services
Business Solutions
Consulting


-Andres Gomez  wrote: -
To: user@storm.incubator.apache.org
From: Andres Gomez andresgome...@gmail.com
Date: 06/02/2014 02:17PM
Subject: Re: Storm-kafka Integration

Do you use trindent or only storm???

Regards,

Andres
El 02/06/2014, a las 10:43, Komal Thombare komal.thomb...@tcs.com escribió:
 Hi,

I am trying to integrate storm with kafka, but did not get any proper 
documentation to do so.
Can anyone please help me for getting started with the integration.

Thanks and Regards,

Komal Thombare

=-=-=
 Notice: The information contained in this e-mail
 message and/or attachments to it may contain 
 confidential or privileged information. If you are 
 not the intended recipient, any dissemination, use, 
 review, distribution, printing or copying of the 
 information contained in this e-mail message 
 and/or attachments to it are strictly prohibited. If 
 you have received this communication in error, 
 please notify us by reply e-mail or telephone and 
 immediately and permanently delete the message 
 and any attachments. Thank you





Re: Storm-kafka Integration

2014-06-02 Thread Deepak Sharma
Hi Komal
Have you looked at KafkaSpout?

Thanks
Deepak


On Mon, Jun 2, 2014 at 2:13 PM, Komal Thombare komal.thomb...@tcs.com
wrote:

 Hi,

 I am trying to integrate storm with kafka, but did not get any proper
 documentation to do so.
 Can anyone please help me for getting started with the integration.

 Thanks and Regards,

 Komal Thombare

 =-=-=
 Notice: The information contained in this e-mail
 message and/or attachments to it may contain
 confidential or privileged information. If you are
 not the intended recipient, any dissemination, use,
 review, distribution, printing or copying of the
 information contained in this e-mail message
 and/or attachments to it are strictly prohibited. If
 you have received this communication in error,
 please notify us by reply e-mail or telephone and
 immediately and permanently delete the message
 and any attachments. Thank you




-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Storm-kafka Integration

2014-06-02 Thread Komal Thombare
 Hi Deepak,

Yes i have. I have also got the storm-contrib source code, but then I am 
unaware of how to compile it.

Thanks and Regards,

Komal Thombare
Tata Consultancy Services Limited
Ph:- 086-55388772
Mail-to: komal.thomb...@tcs.com
Website: http://www.tcs.com

Experience certainty. IT Services
Business Solutions
Consulting


-Deepak Sharma  wrote: -
To: user user@storm.incubator.apache.org
From: Deepak Sharma deepakmc...@gmail.com
Date: 06/02/2014 02:22PM
Subject: Re: Storm-kafka Integration

Hi Komal
Have you looked at KafkaSpout?

Thanks
Deepak


On Mon, Jun 2, 2014 at 2:13 PM, Komal Thombare komal.thomb...@tcs.com wrote:
   Hi,

I am trying to integrate storm with kafka, but did not get any proper 
documentation to do so.
  Can anyone please help me for getting started with the integration.

Thanks and Regards,

Komal Thombare

=-=-=
 Notice: The information contained in this e-mail
 message and/or attachments to it may contain 
 confidential or privileged information. If you are 
 not the intended recipient, any dissemination, use, 
 review, distribution, printing or copying of the 
 information contained in this e-mail message 
 and/or attachments to it are strictly prohibited. If 
 you have received this communication in error, 
 please notify us by reply e-mail or telephone and 
 immediately and permanently delete the message 
 and any attachments. Thank you
  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Storm-kafka Integration

2014-06-02 Thread Joe Stein
Please take a look at
http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/#state-of-the-integration-game

There is a github project too https://github.com/miguno/kafka-storm-starter

This covers latest Storm, Kafka and Avro.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


On Mon, Jun 2, 2014 at 4:55 AM, Komal Thombare komal.thomb...@tcs.com
wrote:

 Hi Deepak,

 Yes i have. I have also got the storm-contrib source code, but then I am
 unaware of how to compile it.


 Thanks and Regards,

 Komal Thombare
 Tata Consultancy Services Limited
 Ph:- 086-55388772
 Mail-to: komal.thomb...@tcs.com
 Website: http://www.tcs.com
 
 Experience certainty. IT Services
 Business Solutions
 Consulting
 

 -Deepak Sharma  wrote: -

 To: user user@storm.incubator.apache.org
 From: Deepak Sharma deepakmc...@gmail.com
 Date: 06/02/2014 02:22PM
 Subject: Re: Storm-kafka Integration


 Hi Komal
 Have you looked at KafkaSpout?

 Thanks
 Deepak


 On Mon, Jun 2, 2014 at 2:13 PM, Komal Thombare komal.thomb...@tcs.com
 wrote:

 Hi,

 I am trying to integrate storm with kafka, but did not get any proper
 documentation to do so.
 Can anyone please help me for getting started with the integration.

 Thanks and Regards,

 Komal Thombare

 =-=-=
 Notice: The information contained in this e-mail
 message and/or attachments to it may contain
 confidential or privileged information. If you are
 not the intended recipient, any dissemination, use,
 review, distribution, printing or copying of the
 information contained in this e-mail message
 and/or attachments to it are strictly prohibited. If
 you have received this communication in error,
 please notify us by reply e-mail or telephone and
 immediately and permanently delete the message
 and any attachments. Thank you




 --
 Thanks
 Deepak
 www.bigdatabig.com
 www.keosha.net




Re: Storm-kafka Integration

2014-06-02 Thread Andres Gomez
You should make a clone of this project:

https://github.com/apache/incubator-storm/tree/master/external/storm-kafka

and do “mvn install”, I suposse you use kafka 0.8.+

and then you do this:

SpoutConfig spoutConfig = new SpoutConfig();

spoutConfig.zkServers = “localhost”; //zookeeper Host
spoutConfig.zkPort = 2181; //zookeeper port
spoutConfig.zkRoot = “/kafkaStorm/“;  // zookeeper path
spoutConfig.id = “storm”; // zookeeper id

// This config is a example!!

KafkaSpout spout = new KafkaSpout(spoutConfig);


El 02/06/2014, a las 10:56, Joe Stein joe.st...@stealth.ly escribió:

 Please take a look at 
 http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/#state-of-the-integration-game
 
 There is a github project too https://github.com/miguno/kafka-storm-starter
 
 This covers latest Storm, Kafka and Avro.
 
 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop
 /
 
 
 On Mon, Jun 2, 2014 at 4:55 AM, Komal Thombare komal.thomb...@tcs.com wrote:
 Hi Deepak,
 
 Yes i have. I have also got the storm-contrib source code, but then I am 
 unaware of how to compile it.
 
 
 Thanks and Regards,
 
 Komal Thombare
 Tata Consultancy Services Limited
 Ph:- 086-55388772
 Mail-to: komal.thomb...@tcs.com
 Website: http://www.tcs.com
 
 Experience certainty. IT Services
 Business Solutions
 Consulting
 
 
 -Deepak Sharma wrote: -
 To: user user@storm.incubator.apache.org
 From: Deepak Sharma deepakmc...@gmail.com
 Date: 06/02/2014 02:22PM
 Subject: Re: Storm-kafka Integration
 
 
 Hi Komal
 Have you looked at KafkaSpout?
 
 Thanks
 Deepak
 
 
 On Mon, Jun 2, 2014 at 2:13 PM, Komal Thombare komal.thomb...@tcs.com wrote:
 Hi,
 
 I am trying to integrate storm with kafka, but did not get any proper 
 documentation to do so.
 Can anyone please help me for getting started with the integration.
 
 Thanks and Regards,
 
 Komal Thombare
 
 =-=-=
 Notice: The information contained in this e-mail
 message and/or attachments to it may contain 
 confidential or privileged information. If you are 
 not the intended recipient, any dissemination, use, 
 review, distribution, printing or copying of the 
 information contained in this e-mail message 
 and/or attachments to it are strictly prohibited. If 
 you have received this communication in error, 
 please notify us by reply e-mail or telephone and 
 immediately and permanently delete the message 
 and any attachments. Thank you
 
 
 
 
 
 -- 
 Thanks
 Deepak
 www.bigdatabig.com
 www.keosha.net
 



Worker dies (bolt)

2014-06-02 Thread Margusja

Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes 
lines from spout to HBase.


recently we did a test - we send 300 000 000 messages over kafka-rest - 
kafka-queue - storm topology - hbase.


I noticed that around one hour and around 2500 messages worker died. PID 
is there and process is up but bolt's execute method hangs.


Bolts code is:
package storm;
  2
  3 import java.util.Map;
  4 import java.util.UUID;
  5
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.hbase.HBaseConfiguration;
  8 import org.apache.hadoop.hbase.client.HTableInterface;
  9 import org.apache.hadoop.hbase.client.HTablePool;
 10 import org.apache.hadoop.hbase.client.Put;
 11 import org.apache.hadoop.hbase.util.Bytes;
 12
 13 import backtype.storm.task.TopologyContext;
 14 import backtype.storm.topology.BasicOutputCollector;
 15 import backtype.storm.topology.OutputFieldsDeclarer;
 16 import backtype.storm.topology.base.BaseBasicBolt;
 17 import backtype.storm.tuple.Fields;
 18 import backtype.storm.tuple.Tuple;
 19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
 22 {
 23
 24 HTableInterface usersTable;
 25 HTablePool pool;
 26 int count = 0;
 27
 28  @Override
 29  public void prepare(Map stormConf, TopologyContext context) {
 30  Configuration conf = HBaseConfiguration.create();
 31 conf.set(hbase.defaults.for.version,0.96.0.2.0.6.0-76-hadoop2);
 32 conf.set(hbase.defaults.for.version.skip,true);
 33  conf.set(hbase.zookeeper.quorum, vm24,vm37,vm38);
 34  conf.set(hbase.zookeeper.property.clientPort, 
2181);
 35  conf.set(hbase.rootdir, 
hdfs://vm38:8020/user/hbase/data);
 36  //conf.set(zookeeper.znode.parent, 
/hbase-unsecure);

 37
 38  pool = new HTablePool(conf, 1);
 39  usersTable = pool.getTable(kafkademo1);
 40  }
41
 42 @Override
 43 public void execute(Tuple tuple, BasicOutputCollector 
collector)

 44 {
 45 String line  = tuple.getString(0);
 46
 47 Put p = new 
Put(Bytes.toBytes(UUID.randomUUID().toString()));
 48 p.add(Bytes.toBytes(info), Bytes.toBytes(line), 
Bytes.toBytes(line));

 49
 50 try {
 51 usersTable.put(p);
 52 count ++;
 53 System.out.println(Count: + count);
 54 }
 55 catch (Exception e){
 56 e.printStackTrace();
 57 }
 58 collector.emit(new Values(line));
 59
 60 }
 61
 62 @Override
 63 public void declareOutputFields(OutputFieldsDeclarer declarer)
 64 {
 65 declarer.declare(new Fields(line));
 66 }
 67
 68 @Override
 69 public void cleanup()
 70 {
 71 try {
 72 usersTable.close();
 73 }
 74 catch (Exception e){
 75 e.printStackTrace();
 76 }
 77 }
 78
 79 }

line in execute method: System.out.println(Count: + count); added in 
debug purpose to see in log that bolt is running.


In to Spout in method  nextTuple()
I added debug line: System.out.println(Message from the Topic ...);

After some time around 50minutes in log file I can see that Spout is 
working but Bolt is died.


Any ideas?

--
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)



Re: Storm-kafka Integration

2014-06-02 Thread Anis Nasir
Dear Komal,

I used the storm-kafka version mentioned in the link.

http://anisnasir.wordpress.com/2014/05/25/apache-storm-kafka-a-nice-choice/


Regards
Anis



On Mon, Jun 2, 2014 at 1:49 PM, Komal Thombare komal.thomb...@tcs.com
wrote:

 Hi Andres,

 Still i am getting the same error.


 Thanks and Regards,

 Komal Thombare


 -Andres Gomez  wrote: -

 To: user@storm.incubator.apache.org
 From: Andres Gomez andresgome...@gmail.com
 Date: 06/02/2014 04:29PM

 Subject: Re: Storm-kafka Integration

 Hi again,

 I advise you to use the repo kafka happens to you:


 https://github.com/apache/incubator-storm/tree/master/external/storm-kafka


 1. make a clone of this project
 2. mvm package — generate a jar of project
 3. mvn install — install the project on local repository

 Regards,

 Andres

 El 02/06/2014, a las 12:56, Komal Thombare komal.thomb...@tcs.com
 escribió:

  Hi,

 I have downloaded the storm-kafka jar from the below link:
 http://repo1.maven.org/maven2/com/n3twork/storm/storm-kafka/20140521/

 Now I have made changes in the storm word count topology and used
 KafkaSpout.
 While running topology in Local mode I get following error:

 java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
 at storm.kafka.KafkaSpout.open(KafkaSpout.java:85)
 ~[storm-kafka-20140521.jar:na]
 at
 backtype.storm.daemon.executor$eval5100$fn__5101$fn__5116.invoke(executor.clj:519)
 ~[na:na]
 at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431) ~[na:na]
 at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
 at java.lang.Thread.run(Thread.java:636) [na:1.6.0_17]
 Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy
 at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
 ~[na:1.6.0_17]
 at java.security.AccessController.doPrivileged(Native Method)
 ~[na:1.6.0_17]
 at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
 ~[na:1.6.0_17]
 at java.lang.ClassLoader.loadClass(ClassLoader.java:319) ~[na:1.6.0_17]
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
 ~[na:1.6.0_17]
 at java.lang.ClassLoader.loadClass(ClassLoader.java:264) ~[na:1.6.0_17]
 at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:332)
 ~[na:1.6.0_17]

 Though the curator-client-1.0.1.jar is present in the lib of storm and I
 have included same in my buildpath.


 Thanks and Regards,

 Komal Thombare


 -Andres Gomez wrote: -

 To: user@storm.incubator.apache.org
 From: Andres Gomez andresgome...@gmail.com
 Date: 06/02/2014 02:33PM
 Subject: Re: Storm-kafka Integration

 You should make a clone of this project:

 https://github.com/apache/incubator-storm/tree/master/external/storm-kafka

 and do “mvn install”, I suposse you use kafka 0.8.+

 and then you do this:

 SpoutConfig spoutConfig = new SpoutConfig();

 spoutConfig.zkServers = “localhost”; //zookeeper Host
 spoutConfig.zkPort = 2181;   //zookeeper port
 spoutConfig.zkRoot = “/kafkaStorm/“;  // zookeeper path
 spoutConfig.id = “storm”; // zookeeper id


 // This config is a example!!

 KafkaSpout spout = new KafkaSpout(spoutConfig);


 El 02/06/2014, a las 10:56, Joe Stein joe.st...@stealth.ly escribió:

 Please take a look at
 http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/#state-of-the-integration-game

 There is a github project too
 https://github.com/miguno/kafka-storm-starter

 This covers latest Storm, Kafka and Avro.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /


 On Mon, Jun 2, 2014 at 4:55 AM, Komal Thombare komal.thomb...@tcs.com
 wrote:

  Hi Deepak,

 Yes i have. I have also got the storm-contrib source code, but then I am
 unaware of how to compile it.


 Thanks and Regards,

 Komal Thombare
 Tata Consultancy Services Limited
 Ph:- 086-55388772
 Mail-to: komal.thomb...@tcs.com
 Website: http://www.tcs.com
 
 Experience certainty. IT Services
 Business Solutions
 Consulting
 

 -Deepak Sharma  wrote: -

 To: user user@storm.incubator.apache.org
 From: Deepak Sharma deepakmc...@gmail.com
 Date: 06/02/2014 02:22PM
 Subject: Re: Storm-kafka Integration


 Hi Komal
 Have you looked at KafkaSpout?

 Thanks
 Deepak


 On Mon, Jun 2, 2014 at 2:13 PM, Komal Thombare komal.thomb...@tcs.com
 wrote:

 Hi,

 I am trying to integrate storm with kafka, but did not get any proper
 documentation to do so.
 Can anyone please help me for getting started with the integration.

 Thanks and Regards,

 Komal Thombare

 =-=-=
 Notice: The information contained in this e-mail
 message and/or attachments to it may contain
 confidential or privileged information. If 

Strange latency behaviour using 1 core (in a multiple-core processor)

2014-06-02 Thread dzacu1a
Hi,
We are doing a benchmark test and limiting the numbers of core used by Storm. 
The topology contains 1 spout and 6 bolts. The first bolt has the highest load. 
Our experiment :  we run the topology on one single machine with 4 cores. We 
start with 1 core and then increasing to 2, 3 and 4 cores. We measure the 
latency with a timestamp when the spout emits the tuple and when the tuple 
reaches the last bolt. 
The result :  the latency results obtained from tests with 2,3 and 4 cores are 
valid (as we expected - the latency results vary from 10 ms to 20 ms)  however 
we got strange values from the tests with only 1 core that we can’t explain. 
The latency results vary from 4 ms to 120 ms.   
Does anyone have any ideas why.
Thanks
Baoh

Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Marc Vaillant
The bolt base classes have a prepare method:

https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html

and the spout base classes have a similar activate method:

https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html

Is that sufficient for your needs or were you thinking of something
different?

Marc

On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
 Hi there -
 
 I would like to set up some state that spouts and bolts share, and I'd like to
 prepare this state when the StormTopology gets 'activated' on a worker.
 
 it would be great if the StormTopology had something like a prepare or open
 method to indicate when it is starting.  I looked but i could find no such 
 API.
   Maybe I should submit an enhancement request ?
 
 Thanks in advance for your responses,
   -  Chris
 
 
 
 [ if anyone is curious, the shared state is for all my application code to
 check or not check invariants.  the invariant checking takes additional time,
 so we don't want to do it in production.. but during testing/development it
 helps catch bugs].
 
 --
 Chris Bedford
 
 Founder  Lead Lackey
 Build Lackey Labs:  http://buildlackey.com
 Go Grails!: http://blog.buildlackey.com
 
 


storm-kafka external project

2014-06-02 Thread Haralds Ulmanis
First , there is small typo kind of error in:
https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
line 217:
 if (lastCompletedOffset != lastCompletedOffset) {
i guess there should be something like  if (_committedTo !=
lastCompletedOffset) {
without that it will never save position information.

Next thing is kafka bolt.
Configuration like: get topic name from global configuration is not
probably best way - I may want to attach to bolts and each send to
different queue. Something like SpouConfig for spout probably would work
better for bolt as well (so you can pass queue name or maybe even different
zk broker list if wanted).
like:
collector.emit(stream1,new Values(v1));
collector.emit(stream2,new Values(v2));
and I'll bind each stream to different kafka bolts to get message sent to
right queue.

Haralds


Re: Optimizing Kafka Stream

2014-06-02 Thread Raphael Hsieh
Thanks for the tips Chi,
I'm a little confused about the partitioning. I had thought that the number
of partitions was determined by the amount of parallelism in the topology.
For example if I said .parallelismHint(4), then I would have 4 different
partitions. Is this not the case ?
Is there a set number of partitions my topology has that I need to increase
in order to have higher parallelism ?

Thanks


On Sat, May 31, 2014 at 11:50 AM, Chi Hoang c...@groupon.com wrote:

 Raphael,
 You can try tuning your parallelism (and num workers).

 For Kafka 0.7, your spout parallelism could max out at: # brokers x #
 partitions (for the topic).  If you have 4 Kafka brokers, and your topic
 has 5 partitions, then you could set the spout parallelism to 20 to
 maximize the throughput.

 For Kafka 0.8+, your spout parallelism could max out at # partitions for
 the topic, so if your topic has 5 partitions, then you would set the spout
 parallelism to 5.  To increase parallelism, you would need to increase the
 number of partitions for your topic (by using the add partitions utility).

 As for the number of workers, setting it to 1 means that your spout will
 only run on a single Storm node, and would likely share resources with
 other Storm processes (spouts and bolts).  I recommend to increase the
 number of workers so Storm has a chance to spread out the work, and keep a
 good balance.

 Hope this helps.

 Chi


 On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh raffihs...@gmail.com
 wrote:

 I am in the process of optimizing my stream. Currently I expect 5 000 000
 tuples to come out of my spout per minute. I am trying to beef up my
 topology in order to process this in real time without falling behind.

 For some reason my batch size is capping out at 83 thousand tuples. I
 can't seem to make it any bigger. the processing time doesn't seem to get
 any smaller than 2-3 seconds either.
 I'm not sure how to configure the topology to get any faster / more
 efficient.

 Currently all the topology does is a groupby on time and an aggregation
 (Count) to aggregate everything.

 Here are some data points i've figured out.

 Batch Size:5mb
 num-workers: 1
 parallelismHint: 2
 (I'll write this a 5mb, 1, 2)

 5mb, 1, 2 = 83K tuples / 6s
 10mb, 1, 2 = 83k / 7s
 5mb, 1, 4 = 83k / 6s
 5mb, 2, 4 = 83k / 3s
 5mb, 3, 6 = 83k / 3s
 10mb, 3, 6 = 83k / 3s

 Can anybody help me figure out how to get it to process things faster ?

 My maxSpoutPending is at 1, but when I increased it to 2 it was the same.
 MessageTimeoutSec = 100

 I've been following this blog: https://gist.github.com/mrflip/5958028
 to an extent, not everything word for word though.

 I need to be able to process around 66,000 tuples per second and I'm
 starting to run out of ideas.

 Thanks

 --
 Raphael Hsieh








-- 
Raphael Hsieh


Writing Bolts in Python

2014-06-02 Thread Ashu Goel
Hi all,

I am experimenting with writing bolts in Python and was wondering how the 
relationship between the Java and Python code works. For example, I have a 
Python bolt that looks like this:

class ScanCountBolt(storm.BasicBolt):
  
  def __init__(self):
#super(ScanCountBolt, self).__init__(script='scancount.py')
self._count = defaultdict(int)

  def process(self, tup):
product = tup.values[0]
self._count[product] += 1
storm.emit([product, self._count[product]])

ScanCountBolt().run()


And my corresponding Java code looks like this:

 public static class ScanCount extends ShellBolt implements IRichBolt {

public ScanCount() {
  super(python, scancount.py);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields(product, scans));
}

@Override
 public MapString, Object getComponentConfiguration() {
   return null;
 }
  }

Is that all I need to make it work or do I need to declare the data structures 
in the Java code as well. I am a bit confused…

-Ashu

Re: Storm with RDBMS

2014-06-02 Thread alex kamil
for parallel reads of massive historical data and high volume writes you
could you a distributed db with SQL layer such as Apache Hbase+Phoenix
http://phoenix.incubator.apache.org/, I think it might  complement Storm
nicely


On Mon, Jun 2, 2014 at 10:19 AM, Nathan Leung ncle...@gmail.com wrote:

 Something like memcached is commonly used for this scenario.  Is memcached
 poorly suited for your goals or data access patterns?


 On Mon, Jun 2, 2014 at 10:06 AM, Balakrishna R 
 balakrishn...@spanservices.com wrote:

  Hi,



 We are evaluating ‘Apache storm’ for one of the business use cases. In
 this use case, the incoming transactions/stream should be processed by set
 of rules or logic. In this process, there is a need of considering the
 historical data (may be 2 weeks or a month old) also.



 Understand that, Storm will give better performance to process the
 incoming transactions in real-time. What if we have to read the historical
 data from RDBMS and use that data in the bolts?

 Will this degrade the performance of whole cluster (as RDBMS systems
 might cause some delay due to the high load of reads from the parallelizing
 different bolts to achieve the better performance).



 Any suggestion on solving this situation? Please share.





 Thanks

 Balakrishna

 DISCLAIMER: This email message and all attachments are confidential and
 may contain information that is Privileged, Confidential or exempt from
 disclosure under applicable law. If you are not the intended recipient, you
 are notified that any dissemination, distribution or copying of this email
 is strictly prohibited.  If you have received this email in error, please
 notify us immediately by return email to mailad...@spanservices.com and
 destroy the original message.  Opinions, conclusions and other information
 in this message that do not relate to the official of SPAN, shall be
 understood to be nether given nor endorsed by SPAN.





Re: Optimizing Kafka Stream

2014-06-02 Thread Chi Hoang
Raphael,
The number of partitions is defined in your Kafka configuration -
http://kafka.apache.org/documentation.html#brokerconfigs (num.partitions) -
or when you create the topic.  The behavior is different for each version
of Kafka, so you should read more documentation.  Your topology needs to
match the Kafka configuration for the topic.

Chi


On Mon, Jun 2, 2014 at 8:46 AM, Raphael Hsieh raffihs...@gmail.com wrote:

 Thanks for the tips Chi,
 I'm a little confused about the partitioning. I had thought that the
 number of partitions was determined by the amount of parallelism in the
 topology. For example if I said .parallelismHint(4), then I would have 4
 different partitions. Is this not the case ?
 Is there a set number of partitions my topology has that I need to
 increase in order to have higher parallelism ?

 Thanks


 On Sat, May 31, 2014 at 11:50 AM, Chi Hoang c...@groupon.com wrote:

 Raphael,
 You can try tuning your parallelism (and num workers).

 For Kafka 0.7, your spout parallelism could max out at: # brokers x #
 partitions (for the topic).  If you have 4 Kafka brokers, and your topic
 has 5 partitions, then you could set the spout parallelism to 20 to
 maximize the throughput.

 For Kafka 0.8+, your spout parallelism could max out at # partitions for
 the topic, so if your topic has 5 partitions, then you would set the spout
 parallelism to 5.  To increase parallelism, you would need to increase the
 number of partitions for your topic (by using the add partitions utility).

 As for the number of workers, setting it to 1 means that your spout will
 only run on a single Storm node, and would likely share resources with
 other Storm processes (spouts and bolts).  I recommend to increase the
 number of workers so Storm has a chance to spread out the work, and keep a
 good balance.

 Hope this helps.

 Chi


 On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh raffihs...@gmail.com
 wrote:

 I am in the process of optimizing my stream. Currently I expect 5 000
 000 tuples to come out of my spout per minute. I am trying to beef up my
 topology in order to process this in real time without falling behind.

 For some reason my batch size is capping out at 83 thousand tuples. I
 can't seem to make it any bigger. the processing time doesn't seem to get
 any smaller than 2-3 seconds either.
 I'm not sure how to configure the topology to get any faster / more
 efficient.

 Currently all the topology does is a groupby on time and an aggregation
 (Count) to aggregate everything.

 Here are some data points i've figured out.

 Batch Size:5mb
 num-workers: 1
 parallelismHint: 2
 (I'll write this a 5mb, 1, 2)

 5mb, 1, 2 = 83K tuples / 6s
 10mb, 1, 2 = 83k / 7s
 5mb, 1, 4 = 83k / 6s
 5mb, 2, 4 = 83k / 3s
 5mb, 3, 6 = 83k / 3s
 10mb, 3, 6 = 83k / 3s

 Can anybody help me figure out how to get it to process things faster ?

 My maxSpoutPending is at 1, but when I increased it to 2 it was the
 same. MessageTimeoutSec = 100

 I've been following this blog: https://gist.github.com/mrflip/5958028
 to an extent, not everything word for word though.

 I need to be able to process around 66,000 tuples per second and I'm
 starting to run out of ideas.

 Thanks

 --
 Raphael Hsieh








 --
 Raphael Hsieh







-- 
Data Systems Engineering
data-syst...@groupon.com


Re: Optimizing Kafka Stream

2014-06-02 Thread Raphael Hsieh
Oh ok. Thanks Chi!
Do you have any ideas about why my batch size never seems to get any bigger
than 83K tuples ?
Currently I'm just using a barebones topology that looks like this:

Stream spout = topology.newStream(..., ...)
   .parallelismHint()
   .groupBy(new Fields(time))
   .aggregate(new Count(), new Fields(Count))
   .parallelismHint()
   .each(new Fields(time, count), new PrintFilter());

All the stream is doing is aggregating on like timestamps and printing out
the count.

in my config I've set batch size to 10mb like so:
Config config = new Config();
config.(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 1024*1024*10);

when I have the batch size to 5mb or even 1mb there is no difference,
everything always adds up to roughly 83K tuples.

in order to count up how many tuples are in the batch, I take a look at the
system timestamp of when things are printed out (in the print filter) and
all the print statements that have the same timestamp, I add the count
values up together.

On Mon, Jun 2, 2014 at 10:09 AM, Chi Hoang c...@groupon.com wrote:

 Raphael,
 The number of partitions is defined in your Kafka configuration -
 http://kafka.apache.org/documentation.html#brokerconfigs (num.partitions)
 - or when you create the topic.  The behavior is different for each version
 of Kafka, so you should read more documentation.  Your topology needs to
 match the Kafka configuration for the topic.

 Chi


 On Mon, Jun 2, 2014 at 8:46 AM, Raphael Hsieh raffihs...@gmail.com
 wrote:

 Thanks for the tips Chi,
 I'm a little confused about the partitioning. I had thought that the
 number of partitions was determined by the amount of parallelism in the
 topology. For example if I said .parallelismHint(4), then I would have 4
 different partitions. Is this not the case ?
 Is there a set number of partitions my topology has that I need to
 increase in order to have higher parallelism ?

 Thanks


 On Sat, May 31, 2014 at 11:50 AM, Chi Hoang c...@groupon.com wrote:

 Raphael,
 You can try tuning your parallelism (and num workers).

 For Kafka 0.7, your spout parallelism could max out at: # brokers x #
 partitions (for the topic).  If you have 4 Kafka brokers, and your topic
 has 5 partitions, then you could set the spout parallelism to 20 to
 maximize the throughput.

 For Kafka 0.8+, your spout parallelism could max out at # partitions for
 the topic, so if your topic has 5 partitions, then you would set the spout
 parallelism to 5.  To increase parallelism, you would need to increase the
 number of partitions for your topic (by using the add partitions utility).

 As for the number of workers, setting it to 1 means that your spout will
 only run on a single Storm node, and would likely share resources with
 other Storm processes (spouts and bolts).  I recommend to increase the
 number of workers so Storm has a chance to spread out the work, and keep a
 good balance.

 Hope this helps.

 Chi


 On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh raffihs...@gmail.com
 wrote:

 I am in the process of optimizing my stream. Currently I expect 5 000
 000 tuples to come out of my spout per minute. I am trying to beef up my
 topology in order to process this in real time without falling behind.

 For some reason my batch size is capping out at 83 thousand tuples. I
 can't seem to make it any bigger. the processing time doesn't seem to get
 any smaller than 2-3 seconds either.
 I'm not sure how to configure the topology to get any faster / more
 efficient.

 Currently all the topology does is a groupby on time and an aggregation
 (Count) to aggregate everything.

 Here are some data points i've figured out.

 Batch Size:5mb
 num-workers: 1
 parallelismHint: 2
 (I'll write this a 5mb, 1, 2)

 5mb, 1, 2 = 83K tuples / 6s
 10mb, 1, 2 = 83k / 7s
 5mb, 1, 4 = 83k / 6s
 5mb, 2, 4 = 83k / 3s
 5mb, 3, 6 = 83k / 3s
 10mb, 3, 6 = 83k / 3s

 Can anybody help me figure out how to get it to process things faster ?

 My maxSpoutPending is at 1, but when I increased it to 2 it was the
 same. MessageTimeoutSec = 100

 I've been following this blog: https://gist.github.com/mrflip/5958028
 to an extent, not everything word for word though.

 I need to be able to process around 66,000 tuples per second and I'm
 starting to run out of ideas.

 Thanks

 --
 Raphael Hsieh








 --
 Raphael Hsieh







 --
 Data Systems Engineering
 data-syst...@groupon.com




-- 
Raphael Hsieh


Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Chris Bedford
Yes.. if i used prepare or open on spouts or bolts it would work, but
unfortunately it would be a bit brittle.  I'd have to include a spout or
bolt just for initializing my invariant code... i'd rather do that when the
topology is activated on the worker.. so this seems like a good use of an
 activated()method on the StormTopology class   (where activated()
would be called after the StormTopology is deserialized by the worker node
process).

But, if there is no such method, I will make do with what is there.

thanks for your response.

chris



On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com
wrote:

 The bolt base classes have a prepare method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html

 and the spout base classes have a similar activate method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html

 Is that sufficient for your needs or were you thinking of something
 different?

 Marc

 On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
  Hi there -
 
  I would like to set up some state that spouts and bolts share, and I'd
 like to
  prepare this state when the StormTopology gets 'activated' on a worker.
 
  it would be great if the StormTopology had something like a prepare or
 open
  method to indicate when it is starting.  I looked but i could find no
 such API.
Maybe I should submit an enhancement request ?
 
  Thanks in advance for your responses,
-  Chris
 
 
 
  [ if anyone is curious, the shared state is for all my application code
 to
  check or not check invariants.  the invariant checking takes additional
 time,
  so we don't want to do it in production.. but during testing/development
 it
  helps catch bugs].
 
  --
  Chris Bedford
 
  Founder  Lead Lackey
  Build Lackey Labs:  http://buildlackey.com
  Go Grails!: http://blog.buildlackey.com
 
 




-- 
Chris Bedford

Founder  Lead Lackey
Build Lackey Labs:  http://buildlackey.com
Go Grails!: http://blog.buildlackey.com


Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Michael Rose
You don't have to include a specific bolt for init code. It's not difficult
to push your init code into a separate class and call it from your bolts,
lock on that class, run init, and then allow other instances to skip over
it.

Without changing bolt/spout code, I've taken to including a task hook for
init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily
extendible and can be included pretty easily too:

stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS,
Lists.newArrayList(MyTaskHook.class.getName()));

Michael Rose (@Xorlev https://twitter.com/xorlev)
Senior Platform Engineer, FullContact http://www.fullcontact.com/
mich...@fullcontact.com


On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford ch...@buildlackey.com wrote:

 Yes.. if i used prepare or open on spouts or bolts it would work, but
 unfortunately it would be a bit brittle.  I'd have to include a spout or
 bolt just for initializing my invariant code... i'd rather do that when the
 topology is activated on the worker.. so this seems like a good use of an
  activated()method on the StormTopology class   (where activated()
 would be called after the StormTopology is deserialized by the worker node
 process).

 But, if there is no such method, I will make do with what is there.

 thanks for your response.

 chris



 On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com
 wrote:

 The bolt base classes have a prepare method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html

 and the spout base classes have a similar activate method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html

 Is that sufficient for your needs or were you thinking of something
 different?

 Marc

 On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
  Hi there -
 
  I would like to set up some state that spouts and bolts share, and I'd
 like to
  prepare this state when the StormTopology gets 'activated' on a worker.
 
  it would be great if the StormTopology had something like a prepare or
 open
  method to indicate when it is starting.  I looked but i could find no
 such API.
Maybe I should submit an enhancement request ?
 
  Thanks in advance for your responses,
-  Chris
 
 
 
  [ if anyone is curious, the shared state is for all my application code
 to
  check or not check invariants.  the invariant checking takes additional
 time,
  so we don't want to do it in production.. but during
 testing/development it
  helps catch bugs].
 
  --
  Chris Bedford
 
  Founder  Lead Lackey
  Build Lackey Labs:  http://buildlackey.com
  Go Grails!: http://blog.buildlackey.com
 
 




 --
 Chris Bedford

 Founder  Lead Lackey
 Build Lackey Labs:  http://buildlackey.com
 Go Grails!: http://blog.buildlackey.com





Re: Writing Bolts in Python

2014-06-02 Thread Andrew Montalenti
The ShellBolt looks for scancount.py in the resources/ directory in your
JAR, which will be extracted to each worker machine. It then simply invokes
python scancount.py in that directory. So you need to make sure the
scancount.py file will be on the classpath under resources/, as well the
storm.py interop library it depends upon.

Based on the official word count example
https://github.com/apache/incubator-storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L40-L56,
your Java ShellBolt definition looks OK.

The storm.py interop library that you're probably using then communicates
with the rest of Storm via the Multi-lang Protocol
https://storm.incubator.apache.org/documentation/Multilang-protocol.html.
This means your Python process is really sending JSON messages over stdout
and receiving JSON messages over stdin. That's the relationship between
Python  Java ( Storm) in this case.

A library I'm working on with my team, streamparse
https://github.com/Parsely/streamparse, makes this workflow easier by
bundling upon a command-line tool for building/submitting/running Python
topologies. For example, getting a Storm + Python wordcount example to
run locally is just a matter of:

sparse quickstart wordcount
cd wordcount
sparse run

It also eliminates the need to write the Java glue code you're putting
together here. It's still in early development but we're already using it
for real Storm 0.8 and 0.9 production clusters  local development.

---
Andrew Montalenti
Co-Founder  CTO
http://parse.ly



On Mon, Jun 2, 2014 at 12:37 PM, Ashu Goel a...@shopkick.com wrote:

 Hi all,

 I am experimenting with writing bolts in Python and was wondering how the
 relationship between the Java and Python code works. For example, I have a
 Python bolt that looks like this:

 class ScanCountBolt(storm.BasicBolt):

   def __init__(self):
 #super(ScanCountBolt, self).__init__(script='scancount.py')
 self._count = defaultdict(int)

   def process(self, tup):
 product = tup.values[0]
 self._count[product] += 1
 storm.emit([product, self._count[product]])

 ScanCountBolt().run()


 And my corresponding Java code looks like this:

  public static class ScanCount extends ShellBolt implements IRichBolt {

 public ScanCount() {
   super(python, scancount.py);
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
   declarer.declare(new Fields(product, scans));
 }

 @Override
  public MapString, Object getComponentConfiguration() {
return null;
  }
   }

 Is that all I need to make it work or do I need to declare the data
 structures in the Java code as well. I am a bit confused...

 -Ashu


Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Chris Bedford
This looks promising. thanks.

hope you don't mind one more question  --  if i create my own
implementation of ITaskHook and add it do the config as you illustrated in
prev. msg..will the prepare() method of my implementation be called
exactly once shortly after StormTopology is deserialized by the worker node
process ?

 - chris




On Mon, Jun 2, 2014 at 7:09 PM, Michael Rose mich...@fullcontact.com
wrote:

 You don't have to include a specific bolt for init code. It's not
 difficult to push your init code into a separate class and call it from
 your bolts, lock on that class, run init, and then allow other instances to
 skip over it.

 Without changing bolt/spout code, I've taken to including a task hook for
 init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily
 extendible and can be included pretty easily too:

 stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS,
 Lists.newArrayList(MyTaskHook.class.getName()));

 Michael Rose (@Xorlev https://twitter.com/xorlev)
 Senior Platform Engineer, FullContact http://www.fullcontact.com/
 mich...@fullcontact.com


 On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford ch...@buildlackey.com
 wrote:

 Yes.. if i used prepare or open on spouts or bolts it would work, but
 unfortunately it would be a bit brittle.  I'd have to include a spout or
 bolt just for initializing my invariant code... i'd rather do that when the
 topology is activated on the worker.. so this seems like a good use of an
  activated()method on the StormTopology class   (where activated()
 would be called after the StormTopology is deserialized by the worker node
 process).

 But, if there is no such method, I will make do with what is there.

 thanks for your response.

 chris



 On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com
 wrote:

 The bolt base classes have a prepare method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html

 and the spout base classes have a similar activate method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html

 Is that sufficient for your needs or were you thinking of something
 different?

 Marc

 On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
  Hi there -
 
  I would like to set up some state that spouts and bolts share, and I'd
 like to
  prepare this state when the StormTopology gets 'activated' on a worker.
 
  it would be great if the StormTopology had something like a prepare or
 open
  method to indicate when it is starting.  I looked but i could find no
 such API.
Maybe I should submit an enhancement request ?
 
  Thanks in advance for your responses,
-  Chris
 
 
 
  [ if anyone is curious, the shared state is for all my application
 code to
  check or not check invariants.  the invariant checking takes
 additional time,
  so we don't want to do it in production.. but during
 testing/development it
  helps catch bugs].
 
  --
  Chris Bedford
 
  Founder  Lead Lackey
  Build Lackey Labs:  http://buildlackey.com
  Go Grails!: http://blog.buildlackey.com
 
 




 --
 Chris Bedford

 Founder  Lead Lackey
 Build Lackey Labs:  http://buildlackey.com
 Go Grails!: http://blog.buildlackey.com






-- 
Chris Bedford

Founder  Lead Lackey
Build Lackey Labs:  http://buildlackey.com
Go Grails!: http://blog.buildlackey.com


Re: Is there any way for my application code to get notified after it gets deserialized on a worker node and before spouts/bolts are opened/prepared ?

2014-06-02 Thread Michael Rose
No, it will be called per bolt instance. That's why init code needs to be
guarded behind a double-check lock to guarantee it only executes once per
JVM.

e.g.

private static volatile boolean initialized = false;

...


if (!initialized) {
   synchronized(MyInitCode.class) {
  if (!initialized) {
 // do stuff
 initialized = true;
  }
   }
}

Until there's a set of lifecycle hooks, that's about as good as I've cared
to make it.

Michael Rose (@Xorlev https://twitter.com/xorlev)
Senior Platform Engineer, FullContact http://www.fullcontact.com/
mich...@fullcontact.com


On Mon, Jun 2, 2014 at 8:27 PM, Chris Bedford ch...@buildlackey.com wrote:

 This looks promising. thanks.

 hope you don't mind one more question  --  if i create my own
 implementation of ITaskHook and add it do the config as you illustrated in
 prev. msg..will the prepare() method of my implementation be called
 exactly once shortly after StormTopology is deserialized by the worker
 node process ?

  - chris




 On Mon, Jun 2, 2014 at 7:09 PM, Michael Rose mich...@fullcontact.com
 wrote:

 You don't have to include a specific bolt for init code. It's not
 difficult to push your init code into a separate class and call it from
 your bolts, lock on that class, run init, and then allow other instances to
 skip over it.

 Without changing bolt/spout code, I've taken to including a task hook for
 init code (e.g. properties / Guice). Check out BaseTaskHook, it's easily
 extendible and can be included pretty easily too:

 stormConfig.put(Config.TOPOLOGY_AUTO_TASK_HOOKS,
 Lists.newArrayList(MyTaskHook.class.getName()));

 Michael Rose (@Xorlev https://twitter.com/xorlev)
 Senior Platform Engineer, FullContact http://www.fullcontact.com/
 mich...@fullcontact.com


 On Mon, Jun 2, 2014 at 7:25 PM, Chris Bedford ch...@buildlackey.com
 wrote:

 Yes.. if i used prepare or open on spouts or bolts it would work, but
 unfortunately it would be a bit brittle.  I'd have to include a spout or
 bolt just for initializing my invariant code... i'd rather do that when the
 topology is activated on the worker.. so this seems like a good use of an
  activated()method on the StormTopology class   (where activated()
 would be called after the StormTopology is deserialized by the worker node
 process).

 But, if there is no such method, I will make do with what is there.

 thanks for your response.

 chris



 On Mon, Jun 2, 2014 at 6:28 AM, Marc Vaillant vaill...@animetrics.com
 wrote:

 The bolt base classes have a prepare method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseBasicBolt.html

 and the spout base classes have a similar activate method:


 https://storm.incubator.apache.org/apidocs/backtype/storm/topology/base/BaseRichSpout.html

 Is that sufficient for your needs or were you thinking of something
 different?

 Marc

 On Sun, Jun 01, 2014 at 04:47:03PM -0700, Chris Bedford wrote:
  Hi there -
 
  I would like to set up some state that spouts and bolts share, and
 I'd like to
  prepare this state when the StormTopology gets 'activated' on a
 worker.
 
  it would be great if the StormTopology had something like a prepare
 or open
  method to indicate when it is starting.  I looked but i could find no
 such API.
Maybe I should submit an enhancement request ?
 
  Thanks in advance for your responses,
-  Chris
 
 
 
  [ if anyone is curious, the shared state is for all my application
 code to
  check or not check invariants.  the invariant checking takes
 additional time,
  so we don't want to do it in production.. but during
 testing/development it
  helps catch bugs].
 
  --
  Chris Bedford
 
  Founder  Lead Lackey
  Build Lackey Labs:  http://buildlackey.com
  Go Grails!: http://blog.buildlackey.com
 
 




 --
 Chris Bedford

 Founder  Lead Lackey
 Build Lackey Labs:  http://buildlackey.com
 Go Grails!: http://blog.buildlackey.com






 --
 Chris Bedford

 Founder  Lead Lackey
 Build Lackey Labs:  http://buildlackey.com
 Go Grails!: http://blog.buildlackey.com





Re: Topology acked/emitted count reset

2014-06-02 Thread Harsha
Hi Andrew,

  From what I read in the code executor.clj (worker) is
responsible for updating the stats for bolts and spouts . If a worker
is restarted or it might be the case if a topology is rebalanced there
is a chance of loosing the stats.  Topology stats derived from spouts
and bolts there is no stats kept track for topology itself. So if a
worker / supervisor died and restarted on another node stats for that
supervisor/workers are lost.

Thanks,

Harsha





On Mon, Jun 2, 2014, at 07:07 PM, Andrew Montalenti wrote:

Attached you'll find two screenshots from the Storm UI, one taken this
morning, and one taken just recently. The Storm topology in question --
cass -- was not restarted in between.

You can see the uptime is 13h (storm_ui_healthy.png) and 26h
(storm_ui_num_reset.png), respectively. Yet, notice that in the later
screenshot, the acked counter for the all-time window has dropped
from 27.2 million to 3.9 million. All the other counts have also
dropped.

What explains this? Shouldn't alltime emit/ack counts for a topology
that's been running 26h non-stop always be greater than the same
topology 13h earlier?

This is with Storm 0.9.1-incubating.

---
Andrew Montalenti
Co-Founder  CTO
[1]http://parse.ly

  Email had 2 attachments:
  * storm_ui_num_reset.png
  *   566k (image/png)
  * storm_ui_healthy.png
  *   541k (image/png)

References

1. http://parse.ly/


Custom metrics using IMetrics interface

2014-06-02 Thread Xueming Li
Hi all,

I am working on a project to build an order processing pipeline on top of
Storm. In order to measure performance, for every spout/bolt and every
order processed by them, one requirement is to generate custom metrics in
the form order id, order entry timestamp in milisec, order exit timestamp
in milisec and then use a metrics consumer to collect and further process
those metrics. I will greatly appreciate it if any one can share some ideas
as to how to implement this requirement. My question is, getValueAndReset
in IMetrics interface is a callback method. Looks to me that in order
to collect the metrics using IMetrics interface, I will have to use a
memory queue to temporarily store metrics generated when orders are
processed in any spout and bolt and wait for getValueAndReset to be called.
Any better idea?

Thanks,
James