java.lang.IllegalArgumentException: timeout value is negative
I'm getting this exception 2014-06-03 19:59:13 STDIO [ERROR][id:] Jun 03, 2014 7:59:13 PM org.jboss.netty.channel.DefaultChannelPipeline WARNING: An exception was thrown by a user handler while handling an exception event ([id: 0xdcf3be42] EXCEPTION: java.net.ConnectException: Connection refused) java.lang.IllegalArgumentException: timeout value is negative at java.lang.Thread.sleep(Native Method) at backtype.storm.messaging.netty.Client.reconnect(Client.java:94) at backtype.storm.messaging.netty.StormClientHandler.exceptionCaught(StormClientHandler.java:118) at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377) *Here's my storm.yaml (from nimbus/zookeeper only server hence ports are commented)* storm.zookeeper.servers: - ma-app05.corp.mydomain.com storm.zookeeper.port: 2181 storm.zookeeper.root: /storm storm.zookeeper.session.timeout: 2 storm.zookeeper.connection.timeout: 15000 storm.zookeeper.retry.times: 5 storm.zookeeper.retry.interval: 1000 storm.zookeeper.retry.intervalceiling.millis: 3 nimbus.host: ma-app05.corp.mydomain.com nimbus.thrift.port: 6627 nimbus.thrift.max_buffer_size: 1048576 nimbus.childopts: -Xmx1024m nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 nimbus.monitor.freq.secs: 10 nimbus.cleanup.inbox.freq.secs: 600 nimbus.inbox.jar.expiration.secs: 3600 nimbus.task.launch.secs: 120 nimbus.reassign: true nimbus.file.copy.expiration.secs: 600 nimbus.topology.validator: backtype.storm.nimbus.DefaultTopologyValidator #No supervisor in Nimbus. Pls. note that storm might still be loading the default conf, if it requires. #supervisor.slots.ports: # - 9951 # - 9952 # - 9953 # - 9954 # - 9955 #supervisor.worker.start.timeout.secs: 120 #supervisor.worker.timeout.secs: 30 #supervisor.monitor.frequency.secs: 3 #supervisor.heartbeat.frequency.secs: 5 #supervisor.enable: true ui.port: 8181 ui.childopts: -Xmx256m logviewer.port: 8000 logviewer.childopts: -Xmx128m logviewer.appender.name: A1 worker.childopts: -Xmx512m -XX:MaxPermSize=256m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:/app/local/var/logs/jvm-stat/gc-storm-worker-%ID%.log worker.heartbeat.frequency.secs: 1 task.heartbeat.frequency.secs: 3 task.refresh.poll.secs: 10 #zmq.threads: 1 #zmq.linger.millis: 5000 #zmq.hwm: 0 storm.local.dir: /ngs/app/isdbd/local/var/run/storm/data storm.local.mode.zmq: false storm.cluster.mode: distributed storm.thrift.transport: backtype.storm.security.auth.SimpleTransportPlugin storm.messaging.transport: backtype.storm.messaging.netty.Context storm.messaging.transport: backtype.storm.messaging.netty.Context storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 storm.messaging.netty.max_retries: 100 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 500 java.library.path: /usr/local/lib:/opt/local/lib:/usr/lib topology.enable.message.timeouts: true topology.debug: false topology.optimize: true topology.workers: 5 topology.acker.executors: null topology.tasks: null topology.message.timeout.secs: 30 topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null topology.max.spout.pending: 20 topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: true topology.worker.childopts: -Xmx512m topology.executor.receive.buffer.size: 1024 topology.executor.send.buffer.size: 1024 topology.receiver.buffer.size: 8 topology.transfer.buffer.size: 1024 topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 topology.disruptor.wait.strategy: com.lmax.disruptor.BlockingWaitStrategy topology.spout.wait.strategy: backtype.storm.spout.SleepSpoutWaitStrategy topology.sleep.spout.wait.strategy.time.ms: 1 topology.error.throttle.interval.secs: 10 topology.max.error.report.per.interval: 5 topology.kryo.factory: backtype.storm.serialization.DefaultKryoFactory topology.tuple.serializer: backtype.storm.serialization.types.ListDelegateSerializer topology.trident.batch.emit.interval.millis: 500 #dev.zookeeper.path: /tmp/dev-storm-zookeeper
Conceptual question on Streams definition...
My definition of stream is continuous feed of data of certain type or with certain purpose (depends on how you want to define your process) I have a situation, where the Domain Object is same across the whole topology, however, each component working on bits and pieces to construct the final document (a JSN document). Option -1 sounds logical when I think , everything is working on same domain object. Option -2 sounds logical when I think, those streams represents different parts of the domain object, so they are not same in reality. Please note that SPoutA to BoltC1 is part of transaction. So , spout A should get an ACK only when all bolts have acked. What I'm trying to understand is , how this Option - 1 and Option 2 affect the functionality. Just an FYI: BoltC1 has a RotatingMapListObject, MapGlobalStreamId, Tuple pendingTuples which it uses to ensure that it acks back only when it has received all tuples from the previous bolts. Thanks, Prasun
Conceptual question on Streams definition...
My definition of stream is continuous feed of data of certain type or with certain purpose (depends on how you want to define your process) I have a situation, where the Domain Object is same across the whole topology, however, each component working on bits and pieces to construct the final document (a JSN document). Option -1 sounds logical when I think , everything is working on same domain object. Option -2 sounds logical when I think, those streams represents different parts of the domain object, so they are not same in reality. [image: Inline image 2] Source link for the image - https://drive.google.com/file/d/0B7Y7mM2uzsNFTWRJekZzS0FXUDQ/edit?usp=sharing Please note that SPoutA to BoltC1 is part of transaction. So , spout A should get an ACK only when all bolts have acked. What I'm trying to understand is , how this Option - 1 and Option 2 affect the functionality. Just an FYI: BoltC1 has a RotatingMapListObject, MapGlobalStreamId, Tuple pendingTuples which it uses to ensure that it acks back only when it has received all tuples from the previous bolts. Thanks, Prasun
Re: Is it possible join another spou/bolt later to the topology?
Can you elaborate on what exactly you mean by join. You can have a bolt defined as part of topology which will load the other jar in the prepare() method call actual functional methods in the execute method. This way , you are dynamically loading the other jar into your storm topo...(which in my view... not a great idea). However, the point of concern is , how will you distribute the other jar... that's a hassle. How big is your OSGi jar and why is it such a big concern ? Prasun On Thu, May 22, 2014 at 2:23 AM, Sajith sajith...@gmail.com wrote: Hi all, Is it possible for us to join another spout or a bolt (not an instance, but a separate one) later to the topology after topology being deployed. My requirement is to receive all the tuples processed by a storm cluster to a single bolt which developed on OSGi and many other dependencies. I don't want this bolt to be added to the original topology since the JAR which contains the topology becomes bulky. Therefore, is it possible for me to join this special spout to the cluster to receive messages one the toplogy is deployed. Any other suggestions or recommendations to achieve this are appreciated. Thanks, Sajith.
Re: Understanding ACKing mechanism
Got the issue resolved. 1. I was not Anchoring to incoming tuple...so effectively, all the Bolts after impactBolt , were not transactional. The ack of impact bolt was causing spout's ack to be called. Proper DAG was not created. So the number I was seeing in WIP was not the true number of tuples that were pending. Whole thing was confusing. After I put in proper anchoring and the fix below...I can see the pendingTuples becoming zero occassionally...which means ...it is working as expected. 2. I was not doing a *parts.put(...)* after *parts = new HashMapGlobalStreamId, Tuple();* above (slip out). This was resulting in leak. Thanks, Prasun On Mon, May 19, 2014 at 1:12 AM, P Ghosh javadevgh...@gmail.com wrote: I have a topology, that looks like *All Bolts emits Fields id, json* *Spout emits only id* *All bolts/spout uses the stream ComponentName_stream name while emitting* *Topology Definition* *==* builder.setSpout(citySpout, citySpout,10); builder.setBolt(impactBolt, impactBolt, 5).fieldsGrouping(citySpout, citySpout_stream, new Fields(id)); builder.setBolt(restaurantBolt, restaurantBolt, 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id)); builder.setBolt(theatresBolt, theatresBolt, 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id)); builder.setBolt(libraryBolt, libraryBolt, 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id)); builder.setBolt(transportBolt, transportBolt, 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id)); builder.setBolt(crimeBolt, crimeBolt, 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id)); builder.setBolt(combinerBolt, combinerBolt, 5) .fieldsGrouping(restaurantBolt, restaurantBolt_stream, new Fields(id)) .fieldsGrouping(theatresBolt, theatresBolt_stream, new Fields(id)) .fieldsGrouping(libraryBolt, libraryBolt_stream, new Fields(id)) .fieldsGrouping(transportBolt, transportBolt_stream, new Fields(id)) .fieldsGrouping(crimeBolt, crimeBolt_stream, new Fields(id)); *CombinerBolt* *== * public class CombinerBolt extends BaseRichBolt { ... ... public void execute(Tuple input) { String id = getId(tuple); //Gets the value corresponding to id from tuple ListObject idList = Arrays.asList(new Object[] { id }); GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); MapGlobalStreamId, Tuple parts; if (!pendingTuples.containsKey(idList)) { parts = new HashMapGlobalStreamId, Tuple(); pendingTuples.put(idList, parts); } else { parts = pendingTuples.get(idList); logger.debug(Pending Tuples [Count: + pendingTuples.size() + ]); if (parts.containsKey(streamId)) { logger.warn(Received same side of single join twice, overriding); } parts.put(streamId, tuple); if (parts.size() == _numSources) { //_numSources is computed at prepare(..) using context.getThisSources().size(); pendingTuples.remove(idList); ListTuple tuples = new ArrayListTuple(parts.values()); try { processTuple(tuples); //This is where the actual document merging is done } catch (Exception exc) { logger.error(There was an exception processing Tuples [ + tuples + ], exc); } } } getOutputCollector().ack(tuple); } ... ... } In my citySpout I have a work-In-Progress (WIP) set (in redis) [having a set ensures that we don't have multiple transactions for the same city at the same time], where every id (city) that is emitted is put in , and it is removed when corresponding ack or failed is invoked on spout. *1. *I'm seeing a lot of Received same side of single join twice, overriding... my expectation, was otherwise, as I'm acking without waiting for join...so there shouldn't be a lot of retry happening *2. *I deactivated the topology and could see the WIP going down to 0 in few seconds, however I continued to see my bolts working even when the WIP has nothing , for another few seconds. items from WIP are removed only when an ACK/FAIL is received at Spout. Based on the details provided in https://github.com/nathanmarz/storm/wiki/Acking-framework-implementation , my expectation was processing will stop the moment WIP is 0. I'm curious, why my bolts are getting data. I will continue to do more investigation. In the meant time , if you see any glaring issue with this approach pls. let me know. Thanks, Prasun
Understanding ACKing mechanism
I have a topology, that looks like *All Bolts emits Fields id, json* *Spout emits only id* *All bolts/spout uses the stream ComponentName_stream name while emitting* *Topology Definition* *==* builder.setSpout(citySpout, citySpout,10); builder.setBolt(impactBolt, impactBolt, 5).fieldsGrouping(citySpout, citySpout_stream, new Fields(id)); builder.setBolt(restaurantBolt, restaurantBolt, 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id)); builder.setBolt(theatresBolt, theatresBolt, 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id)); builder.setBolt(libraryBolt, libraryBolt, 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id)); builder.setBolt(transportBolt, transportBolt, 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id)); builder.setBolt(crimeBolt, crimeBolt, 5).fieldsGrouping(impactBolt, impactBolt_stream, new Fields(id)); builder.setBolt(combinerBolt, combinerBolt, 5) .fieldsGrouping(restaurantBolt, restaurantBolt_stream, new Fields(id)) .fieldsGrouping(theatresBolt, theatresBolt_stream, new Fields(id)) .fieldsGrouping(libraryBolt, libraryBolt_stream, new Fields(id)) .fieldsGrouping(transportBolt, transportBolt_stream, new Fields(id)) .fieldsGrouping(crimeBolt, crimeBolt_stream, new Fields(id)); *CombinerBolt* *== * public class CombinerBolt extends BaseRichBolt { ... ... public void execute(Tuple input) { String id = getId(tuple); //Gets the value corresponding to id from tuple ListObject idList = Arrays.asList(new Object[] { id }); GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); MapGlobalStreamId, Tuple parts; if (!pendingTuples.containsKey(idList)) { parts = new HashMapGlobalStreamId, Tuple(); pendingTuples.put(idList, parts); } else { parts = pendingTuples.get(idList); logger.debug(Pending Tuples [Count: + pendingTuples.size() + ]); if (parts.containsKey(streamId)) { logger.warn(Received same side of single join twice, overriding); } parts.put(streamId, tuple); if (parts.size() == _numSources) { //_numSources is computed at prepare(..) using context.getThisSources().size(); pendingTuples.remove(idList); ListTuple tuples = new ArrayListTuple(parts.values()); try { processTuple(tuples); //This is where the actual document merging is done } catch (Exception exc) { logger.error(There was an exception processing Tuples [ + tuples + ], exc); } } } getOutputCollector().ack(tuple); } ... ... } In my citySpout I have a work-In-Progress (WIP) set (in redis) [having a set ensures that we don't have multiple transactions for the same city at the same time], where every id (city) that is emitted is put in , and it is removed when corresponding ack or failed is invoked on spout. *1. *I'm seeing a lot of Received same side of single join twice, overriding... my expectation, was otherwise, as I'm acking without waiting for join...so there shouldn't be a lot of retry happening *2. *I deactivated the topology and could see the WIP going down to 0 in few seconds, however I continued to see my bolts working even when the WIP has nothing , for another few seconds. items from WIP are removed only when an ACK/FAIL is received at Spout. Based on the details provided in https://github.com/nathanmarz/storm/wiki/Acking-framework-implementation , my expectation was processing will stop the moment WIP is 0. I'm curious, why my bolts are getting data. I will continue to do more investigation. In the meant time , if you see any glaring issue with this approach pls. let me know. Thanks, Prasun
Re: Externalize storm.yaml file
Use soft links. Prasun Sent from Galaxy Nexus On May 14, 2014 6:50 AM, Neha Jain neha_sj...@persistent.co.in wrote: Hello, I have created a storm cluster on Amazon EC2 machines. The requirement we have is to save Storm configuration file to Amazon S3. There would be a bucket in S3 where storm config file would be placed and Storm cluster should access that file. Is it possible to configure config file location in Storm. By default it refers to $STORM_DIR/conf/strom.yaml Can we configure conf directory in Storm to point it to a different location so as jobs to know where to pick the config file from. Thanks in advance Regards, Neha DISCLAIMER == This e-mail may contain privileged and confidential information which is the property of Persistent Systems Ltd. It is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, you are not authorized to read, retain, copy, print, distribute or use this message. If you have received this communication in error, please notify the sender and delete all copies of this message. Persistent Systems Ltd. does not accept any liability for virus infected mails.
Best practice for shutting down storm
I have few topologies running. The spout puts the ID of the object it is emitting into an WIP list in REDIS. When the spout gets the ack or fail method called, it takes it out of the WIP list. The environment and application are undergoing lot of changes.. and as a result I'm required to occasionally restart the topology or the storm cluster itself. Problem is, as I restart, I see quite few messages are left in WIP..which means for these messages, spout didn't receive any ack or fail. My restart process has been 1. Kill the topology from UI (I find killing from UI is more responsive than from command line the killed topology goes off very quickly...if I do it from command line, the killed topology remains in the list for a long time , hindering my ability to relaunch the topology...). I typically kill it it with 0 secs. wait time..(may be this where I'm doing wrong) 2. Go to each VM and stop the a supervisor b logviewer 3. Go to nimbus,shutdown a ui/nimbus/logviewer 4.Go to zookeeper and shutdown zookeeper This I thought is the proper flow...but I doubt that given the left over messages I see in WIP. Any thoughts...will be helpful. Thanks, Prasun
Understanding metrics.log
I added metrics to my storm implementation by implementing IMetric. It is working and can see the metrics log populated with all stats. I've a 3 node (3 worker) and 1 nimbus/zookeeper in Development. On WORKER1's Metrics Log I can see , some metrics with reference to WORKER2 and WORKER3. For example, 2014-04-22 16:42:14,829 230021398184934 *worker3*:9953 17:normalizeBoltnull{bolt.timeconsumed={8, 1657, 5135, 41}} 2014-04-22 16:42:14,830 230031398184934 *worker3*:9953 38:normalizeBoltnull{db.searchip={3, 77, 221, 7}, fetch-doc={3, 32, 101, 8}} 2014-04-22 16:42:14,833 230061398184934 *worker3*:9953 31:normalizeBoltnull{bolt.timeconsumed={7, 1638, 5036, 36}} 2014-04-22 16:42:14,833 230061398184934 *worker3*:9953 10:normalizeBoltnull{db.searchip={3, 86, 165, 6}, fetch-doc={3, 17, 53, 7}} 2014-04-22 16:42:14,865 230381398184934 *worker1*:9953 44:normalizeSpout null{spout.timeconsumed={656, 8454, 56490, 10}} *My questions, * Why I'm seeing the stats of WORKER3 in WORKER1's log file ? My expectation was each worker will just log it's own metrics and it's upto the log collector (like splunk or logstash) to merge and interpret. Thanks, Prasun