Ok, I realized that I did NOT check if ShellSpout.die() was throwing a RuntimeException. I added a try/catch block, and it is! The RuntimeException is preventing _process.destroy and System.exit() from happening, both of which need to happen to make topology recovery happen.
But, I'm not sure *why* this exception is happening yet, since it's an interrupted exception and I don't think the exception tells me *who* interrupted my thread... 2015-02-12T14:12:35.581-0500 b.s.s.ShellSpout [ERROR] die exception! java.lang.RuntimeException: java.lang.InterruptedException at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:102) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.cluster$mk_distributed_cluster_state$reify__3533.mkdirs(cluster.clj:119) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.cluster$mk_storm_cluster_state$reify__3990.report_error(cluster.clj:400) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.daemon.executor$throttled_report_error_fn$fn__5565.invoke(executor.clj:180) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.daemon.executor$fn__5717$fn$reify__5759.reportError(executor.clj:533) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.spout.SpoutOutputCollector.reportError(SpoutOutputCollector.java:132) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.spout.ShellSpout.die(ShellSpout.java:235) [storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.spout.ShellSpout.access$200(ShellSpout.java:42) [storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261) [storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_71] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_71] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_71] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_71] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] Caused by: java.lang.InterruptedException: null at java.lang.Object.wait(Native Method) ~[na:1.7.0_71] at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_71] at org.apache.storm.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at org.apache.storm.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at org.apache.storm.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] at backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:101) ~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT] ... 17 common frames omitted On Wed, Feb 11, 2015 at 4:27 PM, William Oberman <[email protected]> wrote: > I'm not sure what I've learned adds up to yet.... > > I tried setting up a local storm development environment. By mistake, I > forgot to switch to 0.9.3-branch (e.g. I was working on master at first). > In master, I was seeing the ShellBolt heartbeat fail first (which in > retrospect makes sense!). Then I remembered I was trying to debug 0.9.3, > so I switched to 0.9.3-branch. In the branch, I see ShellSpout heartbeat > fail first and jam up, so that's good (?). In both cases, I was trying > local mode rather than cluster mode, as I figured I had a better shot at > debugging in local mode. > > At this point, I figured I had a good test case. This was all using > command line tools (git, mvn) so far. Two thoughts at this point: 1.) I > don't plan on running master, and it's not 100% clear to me if ShellBolt > failing first will solve any problems... 2.) in 0.9.3-branch, after > ShellSpout fails and everything jams up if I "ctl-c" the process I > immediately see the ShellBolt heartbeat timeout message. It's like it was > waiting to write, but was blocked on something (hrmmmm....?) > > But, I hit the limits of my ability to add "System.out" debugging, so I > tried setting up storm in IntelliJ. That took a bit, but I finally figured > out how to run a topology in local mode with a debugger. Once again, I was > seeing ShellSpout heartbeat fail and then nothing happen. > > The next problem is that I don't know how to setup IntelliJ to understand > clojure compiled code (at least, I think that's my problem....) so the > "Step Into/Out of" information is really weird in the debugger. The > best/most complete stack trace I have is: > invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm) > invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm) > invoke():114, zookeeper$mkdirs (backtype.storm) > mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526 > (backtype.storm) > report_error():397, cluster$mk_storm_cluster_state$reify__3983 > (backtype.storm) > invoke():180, executor$throttled_report_error_fn$fn__5548 > (backtype.storm.daemon) > reportError():533, executor$fn__5700$fn$reify__5742 (backtype.storm.daemon) > reportError():132, SpoutOutputCollector (backtype.storm.spout) > > I had a better stack trace (that I lost) that lead into: > org.apache.storm.curator.RetryLoop.callWithRetry() > which for me is my "prime suspect" (based on name alone) for something > that is blocking things up.... :-) > > Though, once again, not understanding the big picture of storm, I have no > idea what all of the above adds up to in terms of what's wrong, and how to > fix it still.... > > will > > On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <[email protected] > > wrote: > >> I'm glad to hear I'm not the only one! >> >> (no new news yet) >> >> >> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <[email protected]> wrote: >> >>> Hi William, >>> >>> I'm having the same problem running a multilang topology (written in >>> python). If you find a solution, please post it here, it will sure help us. >>> >>> To upgrade from 0.9.2-incubating we updated storm.py ( >>> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py) >>> and pom.xml. >>> >>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it >>> works like hell. >>> >>> Best regards, >>> >>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman < >>> [email protected]> wrote: >>> >>>> I'm not sure the best way to share a test case. I'll copy and paste >>>> code below.... If you run the below code (and find the worker that was >>>> running it's log file), you should see in ~30 seconds: >>>> ==== >>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process: >>>> ShellSpout died. >>>> java.lang.RuntimeException: subprocess heartbeat timeout >>>> at >>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) >>>> [storm-core-0.9.3.jar:0.9.3] >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>>> [na:1.7.0_71] >>>> at >>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) >>>> [na:1.7.0_71] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) >>>> [na:1.7.0_71] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>> [na:1.7.0_71] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> [na:1.7.0_71] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> [na:1.7.0_71] >>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] >>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR] >>>> java.lang.RuntimeException: subprocess heartbeat timeout >>>> at >>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) >>>> [storm-core-0.9.3.jar:0.9.3] >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>>> [na:1.7.0_71] >>>> at >>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) >>>> [na:1.7.0_71] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) >>>> [na:1.7.0_71] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>> [na:1.7.0_71] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> [na:1.7.0_71] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> [na:1.7.0_71] >>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] >>>> ======= >>>> >>>> But, the topology will run in a kind of weird zombie state forever. >>>> More specifically I see the multilang bolt process all tuples in the >>>> pending queue, and then an infinite loop of nextTuple()/fail() from the >>>> multilang spout. But, as noted in my original email, if I comment out: >>>> _collector.reportError(exception); >>>> in the Java ShellSpout then the worker will immediately die and respawn. >>>> >>>> If no one can help, the next step for me is rough, as I'll have to >>>> learn how to actually develop and debug storm itself, which is usually at >>>> least 10x harder than just using something :-) >>>> >>>> In any case, my test code: >>>> >>>> Topology = 1 process with two tasks (multilang spout and bolt), and >>>> small pool of pending messages (yes, using the word count example in >>>> storm-starter as a starting point....) >>>> ============= >>>> public class SlowTopology { >>>> public static class SlowPhpBolt extends ShellBolt implements >>>> IRichBolt { >>>> >>>> public SlowPhpBolt() { >>>> super("php", "slowBolt.php"); >>>> } >>>> >>>> @Override >>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>> declarer.declare(new Fields()); >>>> } >>>> >>>> @Override >>>> public Map<String, Object> getComponentConfiguration() { >>>> return null; >>>> } >>>> } >>>> >>>> public static class SlowPhpSpout extends ShellSpout implements >>>> IRichSpout { >>>> >>>> public SlowPhpSpout() { >>>> super("php", "slowSpout.php"); >>>> } >>>> >>>> @Override >>>> public void declareOutputFields(OutputFieldsDeclarer ofd) { >>>> ofd.declare(new Fields("output")); >>>> } >>>> >>>> @Override >>>> public Map<String, Object> getComponentConfiguration() { >>>> return null; >>>> } >>>> } >>>> >>>> public static void main(String[] args) throws Exception { >>>> >>>> TopologyBuilder builder = new TopologyBuilder(); >>>> >>>> builder.setSpout("spout", new SlowPhpSpout(), >>>> 1).setNumTasks(1).setMaxSpoutPending(3); >>>> builder.setBolt("bolt", new SlowPhpBolt(), >>>> 1).setNumTasks(1).shuffleGrouping("spout"); >>>> >>>> Config conf = new Config(); >>>> conf.setDebug(true); >>>> >>>> if (args != null && args.length > 0) { >>>> conf.setNumWorkers(1); >>>> StormSubmitter.submitTopologyWithProgressBar(args[0], conf, >>>> builder.createTopology()); >>>> } >>>> else { >>>> LocalCluster cluster = new LocalCluster(); >>>> cluster.submitTopology("slow", conf, builder.createTopology()); >>>> Thread.sleep(10000); >>>> cluster.shutdown(); >>>> } >>>> } >>>> } >>>> =========== >>>> >>>> slowSpout.php >>>> ========== >>>> <?php >>>> require_once "storm.php"; >>>> class slowSpout extends \ShellSpout { >>>> protected function nextTuple() { >>>> $value = rand(0,100); >>>> $id = rand(0, 100); >>>> $this->emit(array($value), $id); >>>> file_put_contents("/tmp/storm_slow.log", >>>> "nextTuple()->value[$value] id[$id]\n", FILE_APPEND); >>>> sleep(1); >>>> } >>>> protected function ack($id) { >>>> file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND); >>>> } >>>> >>>> protected function fail($id) { >>>> file_put_contents("/tmp/storm_slow.log", "fail($id)\n", >>>> FILE_APPEND); >>>> } >>>> } >>>> >>>> (new slowSpout())->run(); >>>> =========== >>>> >>>> slowBolt.php >>>> ============ >>>> <?php >>>> require_once "storm.php"; >>>> class slowBolt extends \BasicBolt { >>>> protected function process(\Tuple $t) { >>>> $sleep = rand(1, 180); >>>> file_put_contents("/tmp/storm_slow.log", "process(".print_r($t, >>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND); >>>> sleep($sleep); >>>> } >>>> } >>>> (new slowBolt())->run(); >>>> ============ >>>> >>>> storm.php (from https://github.com/lazyshot/storm-php, and I think I >>>> added more error checking on reads/writes to standard in/out, added sync() >>>> to the ShellSpout to make new classes easier to write, and the new >>>> heartbeat protocol) >>>> ========= >>>> <?php >>>> interface iShellBolt { >>>> } >>>> >>>> interface iShellSpout { >>>> } >>>> >>>> class Tuple { >>>> public $id, $component, $stream, $task, $values; >>>> >>>> public function __construct($id, $component, $stream, $task, >>>> $values) { >>>> $this->id = $id; >>>> $this->component = $component; >>>> $this->stream = $stream; >>>> $this->task = $task; >>>> $this->values = $values; >>>> } >>>> } >>>> >>>> abstract class ShellComponent { >>>> protected $pid; >>>> protected $stormConf; >>>> protected $topologyContext; >>>> >>>> protected $stormInc = null; >>>> >>>> public function __construct() { >>>> $this->pid = getmypid(); >>>> $this->sendCommand(array("pid" => $this->pid)); >>>> >>>> $handshake = $this->parseMessage($this->waitForMessage()); >>>> >>>> $this->stormConf = $handshake['conf']; >>>> $this->topologyContext = $handshake['context']; >>>> $pidDir = $handshake['pidDir']; >>>> >>>> @fclose(@fopen($pidDir . "/" . $this->pid, "w")); >>>> } >>>> >>>> protected function readLine() { >>>> $raw = fgets(STDIN); >>>> >>>> if ($raw === false) { >>>> throw new Exception("STDIN is broken"); >>>> } >>>> >>>> $line = trim($raw); >>>> >>>> return $line; >>>> } >>>> >>>> protected function waitForMessage() { >>>> $message = ''; >>>> while (true) { >>>> $line = trim($this->readLine()); >>>> >>>> if (strlen($line) == 0) { >>>> continue; >>>> } else if ($line == 'end') { >>>> break; >>>> } else if ($line == 'sync') { >>>> $message = ''; >>>> continue; >>>> } >>>> >>>> $message .= $line . "\n"; >>>> } >>>> >>>> return trim($message); >>>> } >>>> >>>> protected function sendCommand(array $command) { >>>> $this->sendMessage(json_encode($command)); >>>> } >>>> >>>> protected function sendLog($message) { >>>> return $this->sendCommand(array( >>>> 'command' => 'log', >>>> 'msg' => $message >>>> )); >>>> } >>>> >>>> protected function parseMessage($message) { >>>> $msg = json_decode($message, true); >>>> >>>> if ($msg) { >>>> return $msg; >>>> } else { >>>> return $message; >>>> } >>>> } >>>> >>>> protected function sendMessage($message) { >>>> $message = "$message\nend\n"; >>>> $bytesWritten = fwrite(STDOUT, $message); >>>> fflush(STDOUT); >>>> if ($bytesWritten === false) { >>>> throw new Exception("STDOUT is broken"); >>>> } >>>> if ($bytesWritten != strlen($message)) { >>>> throw new Exception("Unable to write all bytes to STDOUT >>>> (message=$message, bytesWritten=$bytesWritten)"); >>>> } >>>> } >>>> >>>> final protected function sync() { >>>> $command = array( >>>> 'command' => 'sync', >>>> ); >>>> >>>> $this->sendCommand($command); >>>> } >>>> >>>> } >>>> >>>> abstract class ShellBolt extends ShellComponent implements iShellBolt { >>>> >>>> public $anchor_tuple = null; >>>> >>>> public function __construct() { >>>> parent::__construct(); >>>> >>>> $this->init($this->stormConf, $this->topologyContext); >>>> } >>>> >>>> public function run() { >>>> try { >>>> while (true) { >>>> $command = $this->parseMessage($this->waitForMessage()); >>>> >>>> if (is_array($command)) { >>>> if (isset($command['tuple'])) { >>>> $tupleMap = array_merge(array( >>>> 'id' => null, >>>> 'comp' => null, >>>> 'stream' => null, >>>> 'task' => null, >>>> 'tuple' => null >>>> ), >>>> >>>> $command); >>>> >>>> if($tupleMap['task'] == -1 && >>>> $tupleMap['stream'] == "__heartbeat") { >>>> $this->sync(); >>>> } else { >>>> $tuple = new Tuple($tupleMap['id'], >>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], >>>> $tupleMap['tuple']); >>>> $this->process($tuple); >>>> } >>>> } >>>> } >>>> } >>>> } catch (Exception $e) { >>>> $this->sendLog((string)$e); >>>> } >>>> } >>>> >>>> abstract protected function process(Tuple $tuple); >>>> >>>> protected function init($conf, $topology) { >>>> return; >>>> } >>>> >>>> protected function emitTuple(array $tuple, $stream = null, $anchors >>>> = array(), $directTask = null) { >>>> if ($this->anchor_tuple !== null) { >>>> $anchors = array($this->anchor_tuple); >>>> } >>>> >>>> $command = array( >>>> 'command' => 'emit' >>>> ); >>>> >>>> if ($stream !== null) { >>>> $command['stream'] = $stream; >>>> } >>>> >>>> $command['anchors'] = array_map(function ($a) { >>>> return $a->id; >>>> }, $anchors); >>>> >>>> if ($directTask !== null) { >>>> $command['task'] = $directTask; >>>> } >>>> >>>> $command['tuple'] = $tuple; >>>> >>>> $this->sendCommand($command); >>>> } >>>> >>>> protected function emit($tuple, $stream = null, $anchors = array()) >>>> { >>>> $this->emitTuple($tuple, $stream, $anchors); >>>> } >>>> >>>> protected function emitDirect($directTask, $tuple, $stream = null, >>>> $anchors = array()) { >>>> $this->emitTuple($tuple, $stream, $anchors, $directTask); >>>> } >>>> >>>> protected function ack(Tuple $tuple) { >>>> $command = array( >>>> 'command' => 'ack', >>>> 'id' => $tuple->id >>>> ); >>>> >>>> $this->sendCommand($command); >>>> } >>>> >>>> protected function fail(Tuple $tuple) { >>>> $command = array( >>>> 'command' => 'fail', >>>> 'id' => $tuple->id >>>> ); >>>> >>>> $this->sendCommand($command); >>>> } >>>> } >>>> >>>> abstract class BasicBolt extends ShellBolt { >>>> public function run() { >>>> try { >>>> while (true) { >>>> $command = $this->parseMessage($this->waitForMessage()); >>>> >>>> if (is_array($command)) { >>>> if (isset($command['tuple'])) { >>>> $tupleMap = array_merge(array( >>>> 'id' => null, >>>> 'comp' => null, >>>> 'stream' => null, >>>> 'task' => null, >>>> 'tuple' => null >>>> ), >>>> >>>> $command); >>>> >>>> if($tupleMap['task'] == -1 && >>>> $tupleMap['stream'] == "__heartbeat") { >>>> $this->sync(); >>>> } else { >>>> $tuple = new Tuple($tupleMap['id'], >>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], >>>> $tupleMap['tuple']); >>>> >>>> $this->anchor_tuple = $tuple; >>>> >>>> try { >>>> $processed = $this->process($tuple); >>>> >>>> $this->ack($tuple); >>>> } catch (BoltProcessException $e) { >>>> $this->fail($tuple); >>>> } >>>> } >>>> } >>>> } >>>> } >>>> } catch (Exception $e) { >>>> $this->sendLog((string)$e); >>>> } >>>> >>>> } >>>> } >>>> >>>> abstract class ShellSpout extends ShellComponent implements iShellSpout >>>> { >>>> protected $tuples = array(); >>>> >>>> public function __construct() { >>>> parent::__construct(); >>>> >>>> $this->init($this->stormConf, $this->topologyContext); >>>> } >>>> >>>> >>>> abstract protected function nextTuple(); >>>> >>>> abstract protected function ack($tuple_id); >>>> >>>> abstract protected function fail($tuple_id); >>>> >>>> public function run() { >>>> try { >>>> while (true) { >>>> $command = $this->parseMessage($this->waitForMessage()); >>>> >>>> if (is_array($command)) { >>>> if (isset($command['command'])) { >>>> if ($command['command'] == 'ack') { >>>> $this->ack($command['id']); >>>> $this->sync(); >>>> } else if ($command['command'] == 'fail') { >>>> $this->fail($command['id']); >>>> $this->sync(); >>>> } else if ($command['command'] == 'next') { >>>> $this->nextTuple(); >>>> $this->sync(); >>>> } >>>> } >>>> } >>>> } >>>> } catch (Exception $e) { >>>> $this->sendLog((string)$e); >>>> $this->sync(); >>>> } >>>> } >>>> >>>> protected function init($stormConf, $topologyContext) { >>>> return; >>>> } >>>> >>>> final protected function emit(array $tuple, $messageId = null, >>>> $streamId = null) { >>>> return $this->emitTuple($tuple, $messageId, $streamId, null); >>>> } >>>> >>>> final protected function emitDirect($directTask, array $tuple, >>>> $messageId = null, $streamId = null) { >>>> return $this->emitTuple($tuple, $messageId, $streamId, >>>> $directTask); >>>> } >>>> >>>> final private function emitTuple(array $tuple, $messageId = null, >>>> $streamId = null, $directTask = null) { >>>> $command = array( >>>> 'command' => 'emit' >>>> ); >>>> >>>> if ($messageId !== null) { >>>> $command['id'] = $messageId; >>>> } >>>> >>>> if ($streamId !== null) { >>>> $command['stream'] = $streamId; >>>> } >>>> >>>> if ($directTask !== null) { >>>> $command['task'] = $directTask; >>>> } >>>> >>>> $command['tuple'] = $tuple; >>>> >>>> return $this->sendCommand($command); >>>> } >>>> } >>>> >>>> class BoltProcessException extends Exception { >>>> } >>>> >>>> ========================= >>>> >>>> >>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman < >>>> [email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234. >>>>> >>>>> I'll try to cover the important facts that led to this issue: >>>>> >>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some >>>>> existing business logic >>>>> >>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition >>>>> to the ShellBolt protocol) >>>>> >>>>> -I have some odd topologies where I try to do some legacy background >>>>> processing. This processing takes a highly variable amount time in the >>>>> Bolts, from milliseconds to minutes. But, eventually due to randomness >>>>> the >>>>> spout's "pending" pool fills up, causing the spout to block on nextTuple, >>>>> which eventually causes a heartbeat timeout. (I believe my only fix is to >>>>> increase the heartbeat timeout at the topology level. that's not the >>>>> purpose of this email, though confirmation of this as my only workaround >>>>> would be appreciated! I feel like this wasn't anticipated when the >>>>> heartbeat patch was designed, as it was assumed the spout's nextTuple >>>>> wouldn't block I guess?) >>>>> >>>>> -The purpose of this email is the fact that the topology "jams up" >>>>> when the ShellSpout has a heartbeat timeout. I can see my PHP spout/bolt >>>>> still running (I added logging to them), but Storm itself is doing >>>>> nothing. >>>>> >>>>> -I added logging to ShellSpout and recompiled, because I saw the log >>>>> message on like 233 (Halting process: ShellSpout died) but as noted the >>>>> PHP >>>>> process was still running, so I was curious if _process.destroy(); failed. >>>>> But, my logging didn't appear. I assumed I was compiling/deploying wrong. >>>>> Eventually I commented out line 234: _collector.reportError(exception); >>>>> and everything started working!!! >>>>> >>>>> Does this make *any* sense? Why would >>>>> _collector.reportError(exception); block and never return (I waited quite >>>>> a >>>>> long time, 10's of minutes). When I comment out line 234, Storm >>>>> immediately kills my bad tasks and respawns almost instantly. >>>>> >>>>> I feel fairly confident that this will be recreatable. My topology: >>>>> -1 spout (ShellSpout) >>>>> -1 bolt (ShellBolt) >>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt >>>>> + the pending queue is full >>>>> >>>>> Thanks for any feedback! >>>>> >>>>> will >>>>> >>>>> >>>>> >>>> >>>> >>>> >>> >>> >>> -- >>> Alex Sobrino Beltrán >>> Registered Linux User #273657 >>> >>> http://v5tech.es >>> >> >> >> >> > >
