java.lang.IllegalArgumentException: timeout value is negative

2014-06-03 Thread P Ghosh
I'm getting this exception
2014-06-03 19:59:13 STDIO [ERROR][id:] Jun 03, 2014 7:59:13 PM
org.jboss.netty.channel.DefaultChannelPipeline
WARNING: An exception was thrown by a user handler while handling an
exception event ([id: 0xdcf3be42] EXCEPTION: java.net.ConnectException:
Connection refused)
java.lang.IllegalArgumentException: timeout value is negative
at java.lang.Thread.sleep(Native Method)
at backtype.storm.messaging.netty.Client.reconnect(Client.java:94)
at
backtype.storm.messaging.netty.StormClientHandler.exceptionCaught(StormClientHandler.java:118)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377)


*Here's my storm.yaml (from nimbus/zookeeper only server hence ports
are commented)*
storm.zookeeper.servers:
 - ma-app05.corp.mydomain.com
storm.zookeeper.port: 2181
storm.zookeeper.root: /storm
storm.zookeeper.session.timeout: 2
storm.zookeeper.connection.timeout: 15000
storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling.millis: 3


nimbus.host: ma-app05.corp.mydomain.com
nimbus.thrift.port: 6627
nimbus.thrift.max_buffer_size: 1048576
nimbus.childopts: -Xmx1024m
nimbus.task.timeout.secs: 30
nimbus.supervisor.timeout.secs: 60
nimbus.monitor.freq.secs: 10
nimbus.cleanup.inbox.freq.secs: 600
nimbus.inbox.jar.expiration.secs: 3600
nimbus.task.launch.secs: 120
nimbus.reassign: true
nimbus.file.copy.expiration.secs: 600
nimbus.topology.validator: backtype.storm.nimbus.DefaultTopologyValidator

#No supervisor in Nimbus. Pls. note that storm might still be loading the
default conf, if it requires.
#supervisor.slots.ports:
#  - 9951
#  - 9952
#  - 9953
#  - 9954
#  - 9955
#supervisor.worker.start.timeout.secs: 120
#supervisor.worker.timeout.secs: 30
#supervisor.monitor.frequency.secs: 3
#supervisor.heartbeat.frequency.secs: 5
#supervisor.enable: true


ui.port: 8181
ui.childopts: -Xmx256m


logviewer.port: 8000
logviewer.childopts: -Xmx128m
logviewer.appender.name: A1

worker.childopts: -Xmx512m -XX:MaxPermSize=256m -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -verbose:gc
-Xloggc:/app/local/var/logs/jvm-stat/gc-storm-worker-%ID%.log
worker.heartbeat.frequency.secs: 1

task.heartbeat.frequency.secs: 3
task.refresh.poll.secs: 10

#zmq.threads: 1
#zmq.linger.millis: 5000
#zmq.hwm: 0

storm.local.dir: /ngs/app/isdbd/local/var/run/storm/data

storm.local.mode.zmq: false


storm.cluster.mode: distributed

storm.thrift.transport: backtype.storm.security.auth.SimpleTransportPlugin

storm.messaging.transport: backtype.storm.messaging.netty.Context
storm.messaging.transport: backtype.storm.messaging.netty.Context
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 500

java.library.path: /usr/local/lib:/opt/local/lib:/usr/lib

topology.enable.message.timeouts: true
topology.debug: false
topology.optimize: true
topology.workers: 5
topology.acker.executors: null
topology.tasks: null
topology.message.timeout.secs: 30
topology.skip.missing.kryo.registrations: false
topology.max.task.parallelism: null
topology.max.spout.pending: 20
topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.builtin.metrics.bucket.size.secs: 60
topology.fall.back.on.java.serialization: true
topology.worker.childopts: -Xmx512m
topology.executor.receive.buffer.size: 1024

topology.executor.send.buffer.size: 1024

topology.receiver.buffer.size: 8

topology.transfer.buffer.size: 1024

topology.tick.tuple.freq.secs: null
topology.worker.shared.thread.pool.size: 4
topology.disruptor.wait.strategy: com.lmax.disruptor.BlockingWaitStrategy
topology.spout.wait.strategy: backtype.storm.spout.SleepSpoutWaitStrategy
topology.sleep.spout.wait.strategy.time.ms: 1
topology.error.throttle.interval.secs: 10
topology.max.error.report.per.interval: 5
topology.kryo.factory: backtype.storm.serialization.DefaultKryoFactory
topology.tuple.serializer:
backtype.storm.serialization.types.ListDelegateSerializer
topology.trident.batch.emit.interval.millis: 500

#dev.zookeeper.path: /tmp/dev-storm-zookeeper


Conceptual question on Streams definition...

2014-05-23 Thread P Ghosh
My definition of stream is continuous feed of data of certain type or with
certain purpose (depends on how you want to define your process)

I have a situation, where the Domain Object is same across the whole
topology, however, each component working on bits and pieces to construct
the final document (a JSN document).
Option -1 sounds logical when I think , everything is working on same
domain object. Option -2 sounds logical when I think, those streams
represents different parts of the domain object, so they are not same in
reality.



Please note that SPoutA to BoltC1 is part of transaction. So , spout A
should get an ACK only when all bolts have acked.

What I'm trying to understand is , how this Option - 1 and Option 2 affect
the functionality.

Just an FYI: BoltC1 has a

RotatingMapListObject, MapGlobalStreamId, Tuple pendingTuples
which it uses to ensure that it acks back only when it has received all
tuples from the previous bolts.

Thanks,
Prasun


Conceptual question on Streams definition...

2014-05-23 Thread P Ghosh
My definition of stream is continuous feed of data of certain type or with
certain purpose (depends on how you want to define your process)

I have a situation, where the Domain Object is same across the whole
topology, however, each component working on bits and pieces to construct
the final document (a JSN document).
Option -1 sounds logical when I think , everything is working on same
domain object. Option -2 sounds logical when I think, those streams
represents different parts of the domain object, so they are not same in
reality.
[image: Inline image 2]


Source link for the image -
https://drive.google.com/file/d/0B7Y7mM2uzsNFTWRJekZzS0FXUDQ/edit?usp=sharing

Please note that SPoutA to BoltC1 is part of transaction. So , spout A
should get an ACK only when all bolts have acked.

What I'm trying to understand is , how this Option - 1 and Option 2 affect
the functionality.

Just an FYI: BoltC1 has a

RotatingMapListObject, MapGlobalStreamId, Tuple pendingTuples
which it uses to ensure that it acks back only when it has received all
tuples from the previous bolts.

Thanks,
Prasun


Re: Is it possible join another spou/bolt later to the topology?

2014-05-22 Thread P Ghosh
Can you elaborate on what exactly you mean by join. You can have a bolt
defined as part of topology which will load the other jar in the prepare()
method call actual functional methods in the execute method. This way ,
you are dynamically loading the other jar into your storm topo...(which in
my view... not a great idea). However, the point of concern is , how will
you distribute the other jar... that's a hassle. How big is your OSGi jar
and why is it such a big concern ?

Prasun




On Thu, May 22, 2014 at 2:23 AM, Sajith sajith...@gmail.com wrote:

 Hi all,

 Is it possible for us to join another spout or a bolt (not an instance,
 but a separate one) later to the topology after topology being deployed.

 My requirement is to receive all the tuples processed by a storm cluster
 to a single bolt which developed on OSGi and many other dependencies. I
 don't want this bolt to be added to the original topology since the JAR
 which contains the topology becomes bulky.

 Therefore, is it possible for me to join this special spout to the cluster
 to receive messages  one the toplogy is deployed.

 Any other suggestions or recommendations to achieve this are appreciated.

 Thanks,
 Sajith.



Re: Understanding ACKing mechanism

2014-05-20 Thread P Ghosh
Got the issue resolved.
1. I was not Anchoring to incoming tuple...so effectively, all the Bolts
after impactBolt , were not transactional. The ack of impact bolt was
causing spout's ack to be called. Proper DAG was not created.  So the
number I was seeing in WIP was not the true number of tuples that were
pending. Whole thing was confusing. After I put in proper anchoring and the
fix below...I can see the pendingTuples becoming zero occassionally...which
means ...it is working as expected.
2. I was not doing a *parts.put(...)* after *parts = new
HashMapGlobalStreamId, Tuple();* above (slip out). This was resulting in
leak.

Thanks,
Prasun


On Mon, May 19, 2014 at 1:12 AM, P Ghosh javadevgh...@gmail.com wrote:

 I have a topology, that looks like
 *All Bolts emits Fields id, json*
 *Spout emits only id*
 *All bolts/spout uses the stream ComponentName_stream name while
 emitting*

 *Topology Definition*

 *==*

 builder.setSpout(citySpout, citySpout,10);

 builder.setBolt(impactBolt, impactBolt, 5).fieldsGrouping(citySpout,
 citySpout_stream, new Fields(id));

 builder.setBolt(restaurantBolt, restaurantBolt,
 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id));

 builder.setBolt(theatresBolt, theatresBolt,
 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id));

 builder.setBolt(libraryBolt, libraryBolt,
 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id));

 builder.setBolt(transportBolt, transportBolt,
 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id));

 builder.setBolt(crimeBolt, crimeBolt, 5).fieldsGrouping(impactBolt, 
 impactBolt_stream, new Fields(id));

 builder.setBolt(combinerBolt, combinerBolt, 5)

  .fieldsGrouping(restaurantBolt, restaurantBolt_stream, new
 Fields(id))

 .fieldsGrouping(theatresBolt, theatresBolt_stream, new
 Fields(id))

 .fieldsGrouping(libraryBolt, libraryBolt_stream, new
 Fields(id))

 .fieldsGrouping(transportBolt, transportBolt_stream, new
 Fields(id))

 .fieldsGrouping(crimeBolt, crimeBolt_stream, new
 Fields(id));


 *CombinerBolt*

 *== *

 public class CombinerBolt extends BaseRichBolt {

 ...

 ...

  public void execute(Tuple input) {

 String id = getId(tuple); //Gets the value corresponding to id from
 tuple

 ListObject idList = Arrays.asList(new Object[] { id });

 GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(),
 tuple.getSourceStreamId());

 MapGlobalStreamId, Tuple parts;

 if (!pendingTuples.containsKey(idList)) {

 parts = new HashMapGlobalStreamId, Tuple();

 pendingTuples.put(idList, parts);

 } else {

 parts = pendingTuples.get(idList);

 logger.debug(Pending Tuples [Count: + pendingTuples.size() + ]);

 if (parts.containsKey(streamId)) {

 logger.warn(Received same side of single join twice, overriding);

 }

 parts.put(streamId, tuple);

 if (parts.size() == _numSources) { //_numSources is computed at
 prepare(..) using  context.getThisSources().size();

 pendingTuples.remove(idList);

 ListTuple tuples = new ArrayListTuple(parts.values());

 try {

 processTuple(tuples); //This is where the actual document merging is done

 } catch (Exception exc) {

 logger.error(There was an exception processing Tuples [ + tuples + ],
 exc);

 }

 }

  }

 getOutputCollector().ack(tuple);

 }

 ...

  ...

 }

 In my citySpout I have a work-In-Progress (WIP) set (in redis) [having a
 set ensures that we don't have multiple transactions for the same city at
 the same time], where every id (city) that is emitted is put in , and it is
 removed when corresponding ack or failed is invoked on spout.

 *1. *I'm seeing a lot of Received same side of single join twice,
 overriding... my expectation, was otherwise, as I'm acking without waiting
 for join...so there shouldn't be a lot of retry happening

 *2. *I deactivated the topology and could see the WIP going down to 0
 in few seconds, however I continued to see my bolts working even when the
 WIP has nothing , for another few seconds. items from WIP are removed
 only when an ACK/FAIL is received at Spout. Based on the details provided
 in
 https://github.com/nathanmarz/storm/wiki/Acking-framework-implementation , my 
 expectation was processing will stop the moment WIP is 0.

 I'm curious, why my bolts are getting data.

 I will continue to do more investigation. In the meant time  , if you see
 any glaring issue with this approach pls. let me know.

 Thanks,
 Prasun



Understanding ACKing mechanism

2014-05-19 Thread P Ghosh
I have a topology, that looks like
*All Bolts emits Fields id, json*
*Spout emits only id*
*All bolts/spout uses the stream ComponentName_stream name while emitting*

*Topology Definition*

*==*

builder.setSpout(citySpout, citySpout,10);

builder.setBolt(impactBolt, impactBolt, 5).fieldsGrouping(citySpout,
citySpout_stream, new Fields(id));

builder.setBolt(restaurantBolt, restaurantBolt,
5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id));

builder.setBolt(theatresBolt, theatresBolt,
5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id));

builder.setBolt(libraryBolt, libraryBolt, 5).fieldsGrouping(impactBolt,
impactBolt_stream, new Fields(id));

builder.setBolt(transportBolt, transportBolt,
5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id));

builder.setBolt(crimeBolt, crimeBolt, 5).fieldsGrouping(impactBolt, 
impactBolt_stream, new Fields(id));

builder.setBolt(combinerBolt, combinerBolt, 5)

 .fieldsGrouping(restaurantBolt, restaurantBolt_stream, new
Fields(id))

.fieldsGrouping(theatresBolt, theatresBolt_stream, new
Fields(id))

.fieldsGrouping(libraryBolt, libraryBolt_stream, new
Fields(id))

.fieldsGrouping(transportBolt, transportBolt_stream, new
Fields(id))

.fieldsGrouping(crimeBolt, crimeBolt_stream, new Fields(id));


*CombinerBolt*

*== *

public class CombinerBolt extends BaseRichBolt {

...

...

public void execute(Tuple input) {

String id = getId(tuple); //Gets the value corresponding to id from tuple

ListObject idList = Arrays.asList(new Object[] { id });

GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(),
tuple.getSourceStreamId());

MapGlobalStreamId, Tuple parts;

if (!pendingTuples.containsKey(idList)) {

parts = new HashMapGlobalStreamId, Tuple();

pendingTuples.put(idList, parts);

} else {

parts = pendingTuples.get(idList);

logger.debug(Pending Tuples [Count: + pendingTuples.size() + ]);

if (parts.containsKey(streamId)) {

logger.warn(Received same side of single join twice, overriding);

}

parts.put(streamId, tuple);

if (parts.size() == _numSources) { //_numSources is computed at prepare(..)
using  context.getThisSources().size();

pendingTuples.remove(idList);

ListTuple tuples = new ArrayListTuple(parts.values());

try {

processTuple(tuples); //This is where the actual document merging is done

} catch (Exception exc) {

logger.error(There was an exception processing Tuples [ + tuples + ],
exc);

}

}

}

getOutputCollector().ack(tuple);

}

...

...

}

In my citySpout I have a work-In-Progress (WIP) set (in redis) [having a
set ensures that we don't have multiple transactions for the same city at
the same time], where every id (city) that is emitted is put in , and it is
removed when corresponding ack or failed is invoked on spout.

*1. *I'm seeing a lot of Received same side of single join twice,
overriding... my expectation, was otherwise, as I'm acking without waiting
for join...so there shouldn't be a lot of retry happening

*2. *I deactivated the topology and could see the WIP going down to 0 in
few seconds, however I continued to see my bolts working even when the WIP
has nothing , for another few seconds. items from WIP are removed only
when an ACK/FAIL is received at Spout. Based on the details provided in
https://github.com/nathanmarz/storm/wiki/Acking-framework-implementation  ,
my expectation was processing will stop the moment WIP is 0.

I'm curious, why my bolts are getting data.

I will continue to do more investigation. In the meant time  , if you see
any glaring issue with this approach pls. let me know.

Thanks,
Prasun


Re: Externalize storm.yaml file

2014-05-19 Thread P Ghosh
Use soft links.

Prasun

Sent from Galaxy Nexus
On May 14, 2014 6:50 AM, Neha Jain neha_sj...@persistent.co.in wrote:

  Hello,



 I have created a storm cluster on Amazon EC2 machines. The requirement we
 have is to save Storm configuration file to Amazon S3.

 There would be a bucket in S3 where storm config file would be placed and
 Storm cluster should access that file.



 Is it possible to configure config file location in Storm. By default it
 refers to $STORM_DIR/conf/strom.yaml

 Can we configure conf directory in Storm to point it to a different
 location so as jobs to know where to pick the config file from.



 Thanks in advance



 Regards,

 Neha

 DISCLAIMER == This e-mail may contain privileged and confidential
 information which is the property of Persistent Systems Ltd. It is intended
 only for the use of the individual or entity to which it is addressed. If
 you are not the intended recipient, you are not authorized to read, retain,
 copy, print, distribute or use this message. If you have received this
 communication in error, please notify the sender and delete all copies of
 this message. Persistent Systems Ltd. does not accept any liability for
 virus infected mails.



Best practice for shutting down storm

2014-05-01 Thread P Ghosh
I have few topologies running. The spout puts the ID of the object it is
emitting into an WIP list in REDIS. When the spout gets the ack or fail
method called, it takes it out of the WIP list.

The environment and application are undergoing lot of changes.. and as a
result I'm required to occasionally restart the topology or the storm
cluster itself.

Problem is, as I restart, I see quite few messages are left in WIP..which
means for these messages, spout didn't receive any ack or fail.

My restart process has been
1. Kill the topology from UI (I find killing from UI is more responsive
than from command line the killed topology goes off very quickly...if I
do it from command line, the killed topology remains in the list for a
long time , hindering my ability to relaunch the topology...). I typically
kill it it with 0 secs. wait time..(may be this where I'm doing wrong)

2. Go to each VM and stop the
  a supervisor
  b logviewer
3. Go to nimbus,shutdown
 a ui/nimbus/logviewer
4.Go to zookeeper and shutdown zookeeper


This I thought is the proper flow...but I doubt that given the left over
messages I see in WIP.

Any thoughts...will be helpful.

Thanks,
Prasun


Understanding metrics.log

2014-04-22 Thread P Ghosh
I added metrics to my storm implementation by implementing IMetric. It is
working and can see the metrics log populated with all stats. I've a 3 node
(3 worker) and 1 nimbus/zookeeper in Development.

On WORKER1's Metrics Log I can see , some metrics with reference to
 WORKER2 and WORKER3. For example,

2014-04-22 16:42:14,829 230021398184934 *worker3*:9953
 17:normalizeBoltnull{bolt.timeconsumed={8,
1657, 5135, 41}}
2014-04-22 16:42:14,830 230031398184934 *worker3*:9953
 38:normalizeBoltnull{db.searchip={3, 77, 221,
7}, fetch-doc={3, 32, 101, 8}}
2014-04-22 16:42:14,833 230061398184934 *worker3*:9953
 31:normalizeBoltnull{bolt.timeconsumed={7,
1638, 5036, 36}}
2014-04-22 16:42:14,833 230061398184934 *worker3*:9953
 10:normalizeBoltnull{db.searchip={3, 86, 165,
6}, fetch-doc={3, 17, 53, 7}}
2014-04-22 16:42:14,865 230381398184934 *worker1*:9953
 44:normalizeSpout  null{spout.timeconsumed={656,
8454, 56490, 10}}


*My questions, *
   Why I'm seeing the stats of WORKER3 in WORKER1's log file ? My
expectation was each worker will just log it's own metrics and it's upto
the log collector (like splunk or logstash)  to merge and interpret.

Thanks,
Prasun