Ok, I realized that I did NOT check if ShellSpout.die() was throwing a
RuntimeException.   I added a try/catch block, and it is!   The
RuntimeException is preventing _process.destroy and System.exit() from
happening, both of which need to happen to make topology recovery happen.

But, I'm not sure *why* this exception is happening yet, since it's an
interrupted exception and I don't think the exception tells me *who*
interrupted my thread...

2015-02-12T14:12:35.581-0500 b.s.s.ShellSpout [ERROR] die exception!
java.lang.RuntimeException: java.lang.InterruptedException
at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:102)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.cluster$mk_distributed_cluster_state$reify__3533.mkdirs(cluster.clj:119)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.cluster$mk_storm_cluster_state$reify__3990.report_error(cluster.clj:400)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.daemon.executor$throttled_report_error_fn$fn__5565.invoke(executor.clj:180)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.daemon.executor$fn__5717$fn$reify__5759.reportError(executor.clj:533)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.spout.SpoutOutputCollector.reportError(SpoutOutputCollector.java:132)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at backtype.storm.spout.ShellSpout.die(ShellSpout.java:235)
[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at backtype.storm.spout.ShellSpout.access$200(ShellSpout.java:42)
[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
[na:1.7.0_71]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
[na:1.7.0_71]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
[na:1.7.0_71]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[na:1.7.0_71]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[na:1.7.0_71]
at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_71]
at
org.apache.storm.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at org.apache.storm.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
org.apache.storm.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at org.apache.storm.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
org.apache.storm.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
org.apache.storm.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
at
backtype.storm.zookeeper$exists_node_QMARK_$fn__3279.invoke(zookeeper.clj:101)
~[storm-core-0.9.3.jar:0.9.4-SNAPSHOT]
... 17 common frames omitted


On Wed, Feb 11, 2015 at 4:27 PM, William Oberman <[email protected]>
wrote:

> I'm not sure what I've learned adds up to yet....
>
> I tried setting up a local storm development environment.  By mistake, I
> forgot to switch to 0.9.3-branch (e.g. I was working on master at first).
> In master, I was seeing the ShellBolt heartbeat fail first (which in
> retrospect makes sense!).  Then I remembered I was trying to debug 0.9.3,
> so I switched to 0.9.3-branch.  In the branch, I see ShellSpout heartbeat
> fail first and jam up, so that's good (?).  In both cases, I was trying
> local mode rather than cluster mode, as I figured I had a better shot at
> debugging in local mode.
>
> At this point, I figured I had a good test case.  This was all using
> command line tools (git, mvn) so far.   Two thoughts at this point:  1.) I
> don't plan on running master, and it's not 100% clear to me if ShellBolt
> failing first will solve any problems...  2.) in 0.9.3-branch, after
> ShellSpout fails and everything jams up if I "ctl-c" the process I
> immediately see the ShellBolt heartbeat timeout message.  It's like it was
> waiting to write, but was blocked on something (hrmmmm....?)
>
> But, I hit the limits of my ability to add "System.out" debugging, so I
> tried setting up storm in IntelliJ.  That took a bit, but I finally figured
> out how to run a topology in local mode with a debugger.  Once again, I was
> seeing ShellSpout heartbeat fail and then nothing happen.
>
> The next problem is that I don't know how to setup IntelliJ to understand
> clojure compiled code (at least, I think that's my problem....) so the
> "Step Into/Out of" information is really weird in the debugger.  The
> best/most complete stack trace I have is:
> invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm)
> invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm)
> invoke():114, zookeeper$mkdirs (backtype.storm)
> mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526
> (backtype.storm)
> report_error():397, cluster$mk_storm_cluster_state$reify__3983
> (backtype.storm)
> invoke():180, executor$throttled_report_error_fn$fn__5548
> (backtype.storm.daemon)
> reportError():533, executor$fn__5700$fn$reify__5742 (backtype.storm.daemon)
> reportError():132, SpoutOutputCollector (backtype.storm.spout)
>
> I had a better stack trace (that I lost) that lead into:
> org.apache.storm.curator.RetryLoop.callWithRetry()
> which for me is my "prime suspect" (based on name alone) for something
> that is blocking things up....  :-)
>
> Though, once again, not understanding the big picture of storm, I have no
> idea what all of the above adds up to in terms of what's wrong, and how to
> fix it still....
>
> will
>
> On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <[email protected]
> > wrote:
>
>> I'm glad to hear I'm not the only one!
>>
>> (no new news yet)
>>
>>
>> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <[email protected]> wrote:
>>
>>> Hi William,
>>>
>>> I'm having the same problem running a multilang topology (written in
>>> python). If you find a solution, please post it here, it will sure help us.
>>>
>>> To upgrade from 0.9.2-incubating we updated storm.py (
>>> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
>>> and pom.xml.
>>>
>>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
>>> works like hell.
>>>
>>> Best regards,
>>>
>>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <
>>> [email protected]> wrote:
>>>
>>>> I'm not sure the best way to share a test case.  I'll copy and paste
>>>> code below....  If you run the below code (and find the worker that was
>>>> running it's log file), you should see in ~30 seconds:
>>>> ====
>>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
>>>> ShellSpout died.
>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>         at
>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> [na:1.7.0_71]
>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>         at
>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> [na:1.7.0_71]
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> [na:1.7.0_71]
>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>>> =======
>>>>
>>>> But, the topology will run in a kind of weird zombie state forever.
>>>> More specifically I see the multilang bolt process all tuples in the
>>>> pending queue, and then an infinite loop of nextTuple()/fail() from the
>>>> multilang spout.  But, as noted in my original email, if I comment out:
>>>>  _collector.reportError(exception);
>>>> in the Java ShellSpout then the worker will immediately die and respawn.
>>>>
>>>> If no one can help, the next step for me is rough, as I'll have to
>>>> learn how to actually develop and debug storm itself, which is usually at
>>>> least 10x harder than just using something :-)
>>>>
>>>> In any case, my test code:
>>>>
>>>> Topology = 1 process with two tasks (multilang spout and bolt), and
>>>> small pool of pending messages (yes, using the word count example in
>>>> storm-starter as a starting point....)
>>>> =============
>>>> public class SlowTopology {
>>>>   public static class SlowPhpBolt extends ShellBolt implements
>>>> IRichBolt {
>>>>
>>>>     public SlowPhpBolt() {
>>>>       super("php", "slowBolt.php");
>>>>     }
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>       declarer.declare(new Fields());
>>>>     }
>>>>
>>>>     @Override
>>>>     public Map<String, Object> getComponentConfiguration() {
>>>>       return null;
>>>>     }
>>>>   }
>>>>
>>>>   public static class SlowPhpSpout extends ShellSpout implements
>>>> IRichSpout {
>>>>
>>>>       public SlowPhpSpout() {
>>>>           super("php", "slowSpout.php");
>>>>       }
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>         ofd.declare(new Fields("output"));
>>>>     }
>>>>
>>>>     @Override
>>>>     public Map<String, Object> getComponentConfiguration() {
>>>>         return null;
>>>>     }
>>>>   }
>>>>
>>>>   public static void main(String[] args) throws Exception {
>>>>
>>>>     TopologyBuilder builder = new TopologyBuilder();
>>>>
>>>>     builder.setSpout("spout", new SlowPhpSpout(),
>>>> 1).setNumTasks(1).setMaxSpoutPending(3);
>>>>     builder.setBolt("bolt", new SlowPhpBolt(),
>>>> 1).setNumTasks(1).shuffleGrouping("spout");
>>>>
>>>>     Config conf = new Config();
>>>>     conf.setDebug(true);
>>>>
>>>>     if (args != null && args.length > 0) {
>>>>       conf.setNumWorkers(1);
>>>>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
>>>> builder.createTopology());
>>>>     }
>>>>     else {
>>>>       LocalCluster cluster = new LocalCluster();
>>>>       cluster.submitTopology("slow", conf, builder.createTopology());
>>>>       Thread.sleep(10000);
>>>>       cluster.shutdown();
>>>>     }
>>>>   }
>>>> }
>>>> ===========
>>>>
>>>> slowSpout.php
>>>> ==========
>>>> <?php
>>>> require_once "storm.php";
>>>> class slowSpout extends \ShellSpout {
>>>>   protected function nextTuple() {
>>>>     $value = rand(0,100);
>>>>     $id = rand(0, 100);
>>>>     $this->emit(array($value), $id);
>>>>     file_put_contents("/tmp/storm_slow.log",
>>>> "nextTuple()->value[$value] id[$id]\n", FILE_APPEND);
>>>>     sleep(1);
>>>>   }
>>>>   protected function ack($id) {
>>>>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND);
>>>>   }
>>>>
>>>>   protected function fail($id) {
>>>>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n",
>>>> FILE_APPEND);
>>>>   }
>>>> }
>>>>
>>>> (new slowSpout())->run();
>>>> ===========
>>>>
>>>> slowBolt.php
>>>> ============
>>>> <?php
>>>> require_once "storm.php";
>>>> class slowBolt extends \BasicBolt {
>>>>   protected function process(\Tuple $t) {
>>>>     $sleep = rand(1, 180);
>>>>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
>>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>>>>     sleep($sleep);
>>>>   }
>>>> }
>>>> (new slowBolt())->run();
>>>> ============
>>>>
>>>> storm.php  (from  https://github.com/lazyshot/storm-php, and I think I
>>>> added more error checking on reads/writes to standard in/out, added sync()
>>>> to the ShellSpout to make new classes easier to write, and the new
>>>> heartbeat protocol)
>>>> =========
>>>> <?php
>>>> interface iShellBolt {
>>>> }
>>>>
>>>> interface iShellSpout {
>>>> }
>>>>
>>>> class Tuple {
>>>>     public $id, $component, $stream, $task, $values;
>>>>
>>>>     public function __construct($id, $component, $stream, $task,
>>>> $values) {
>>>>         $this->id = $id;
>>>>         $this->component = $component;
>>>>         $this->stream = $stream;
>>>>         $this->task = $task;
>>>>         $this->values = $values;
>>>>     }
>>>> }
>>>>
>>>> abstract class ShellComponent {
>>>>     protected $pid;
>>>>     protected $stormConf;
>>>>     protected $topologyContext;
>>>>
>>>>     protected $stormInc = null;
>>>>
>>>>     public function __construct() {
>>>>         $this->pid = getmypid();
>>>>         $this->sendCommand(array("pid" => $this->pid));
>>>>
>>>>         $handshake = $this->parseMessage($this->waitForMessage());
>>>>
>>>>         $this->stormConf = $handshake['conf'];
>>>>         $this->topologyContext = $handshake['context'];
>>>>         $pidDir = $handshake['pidDir'];
>>>>
>>>>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>>>>     }
>>>>
>>>>     protected function readLine() {
>>>>         $raw = fgets(STDIN);
>>>>
>>>>         if ($raw === false) {
>>>>             throw new Exception("STDIN is broken");
>>>>         }
>>>>
>>>>         $line = trim($raw);
>>>>
>>>>         return $line;
>>>>     }
>>>>
>>>>     protected function waitForMessage() {
>>>>         $message = '';
>>>>         while (true) {
>>>>             $line = trim($this->readLine());
>>>>
>>>>             if (strlen($line) == 0) {
>>>>                 continue;
>>>>             } else if ($line == 'end') {
>>>>                 break;
>>>>             } else if ($line == 'sync') {
>>>>                 $message = '';
>>>>                 continue;
>>>>             }
>>>>
>>>>             $message .= $line . "\n";
>>>>         }
>>>>
>>>>         return trim($message);
>>>>     }
>>>>
>>>>     protected function sendCommand(array $command) {
>>>>         $this->sendMessage(json_encode($command));
>>>>     }
>>>>
>>>>     protected function sendLog($message) {
>>>>         return $this->sendCommand(array(
>>>>             'command' => 'log',
>>>>             'msg' => $message
>>>>         ));
>>>>     }
>>>>
>>>>     protected function parseMessage($message) {
>>>>         $msg = json_decode($message, true);
>>>>
>>>>         if ($msg) {
>>>>             return $msg;
>>>>         } else {
>>>>             return $message;
>>>>         }
>>>>     }
>>>>
>>>>     protected function sendMessage($message) {
>>>>         $message = "$message\nend\n";
>>>>         $bytesWritten = fwrite(STDOUT, $message);
>>>>         fflush(STDOUT);
>>>>         if ($bytesWritten === false) {
>>>>             throw new Exception("STDOUT is broken");
>>>>         }
>>>>         if ($bytesWritten != strlen($message)) {
>>>>             throw new Exception("Unable to write all bytes to STDOUT
>>>> (message=$message, bytesWritten=$bytesWritten)");
>>>>         }
>>>>     }
>>>>
>>>>     final protected function sync() {
>>>>         $command = array(
>>>>             'command' => 'sync',
>>>>         );
>>>>
>>>>         $this->sendCommand($command);
>>>>     }
>>>>
>>>> }
>>>>
>>>> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>>>>
>>>>     public $anchor_tuple = null;
>>>>
>>>>     public function __construct() {
>>>>         parent::__construct();
>>>>
>>>>         $this->init($this->stormConf, $this->topologyContext);
>>>>     }
>>>>
>>>>     public function run() {
>>>>         try {
>>>>             while (true) {
>>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>>
>>>>                 if (is_array($command)) {
>>>>                     if (isset($command['tuple'])) {
>>>>                         $tupleMap = array_merge(array(
>>>>                                 'id' => null,
>>>>                                 'comp' => null,
>>>>                                 'stream' => null,
>>>>                                 'task' => null,
>>>>                                 'tuple' => null
>>>>                             ),
>>>>
>>>>                             $command);
>>>>
>>>>                         if($tupleMap['task'] == -1 &&
>>>> $tupleMap['stream'] == "__heartbeat") {
>>>>                             $this->sync();
>>>>                         } else {
>>>>                             $tuple = new Tuple($tupleMap['id'],
>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>>> $tupleMap['tuple']);
>>>>                             $this->process($tuple);
>>>>                         }
>>>>                     }
>>>>                 }
>>>>             }
>>>>         } catch (Exception $e) {
>>>>             $this->sendLog((string)$e);
>>>>         }
>>>>     }
>>>>
>>>>     abstract protected function process(Tuple $tuple);
>>>>
>>>>     protected function init($conf, $topology) {
>>>>         return;
>>>>     }
>>>>
>>>>     protected function emitTuple(array $tuple, $stream = null, $anchors
>>>> = array(), $directTask = null) {
>>>>         if ($this->anchor_tuple !== null) {
>>>>             $anchors = array($this->anchor_tuple);
>>>>         }
>>>>
>>>>         $command = array(
>>>>             'command' => 'emit'
>>>>         );
>>>>
>>>>         if ($stream !== null) {
>>>>             $command['stream'] = $stream;
>>>>         }
>>>>
>>>>         $command['anchors'] = array_map(function ($a) {
>>>>             return $a->id;
>>>>         }, $anchors);
>>>>
>>>>         if ($directTask !== null) {
>>>>             $command['task'] = $directTask;
>>>>         }
>>>>
>>>>         $command['tuple'] = $tuple;
>>>>
>>>>         $this->sendCommand($command);
>>>>     }
>>>>
>>>>     protected function emit($tuple, $stream = null, $anchors = array())
>>>> {
>>>>         $this->emitTuple($tuple, $stream, $anchors);
>>>>     }
>>>>
>>>>     protected function emitDirect($directTask, $tuple, $stream = null,
>>>> $anchors = array()) {
>>>>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
>>>>     }
>>>>
>>>>     protected function ack(Tuple $tuple) {
>>>>         $command = array(
>>>>             'command' => 'ack',
>>>>             'id' => $tuple->id
>>>>         );
>>>>
>>>>         $this->sendCommand($command);
>>>>     }
>>>>
>>>>     protected function fail(Tuple $tuple) {
>>>>         $command = array(
>>>>             'command' => 'fail',
>>>>             'id' => $tuple->id
>>>>         );
>>>>
>>>>         $this->sendCommand($command);
>>>>     }
>>>> }
>>>>
>>>> abstract class BasicBolt extends ShellBolt {
>>>>     public function run() {
>>>>         try {
>>>>             while (true) {
>>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>>
>>>>                 if (is_array($command)) {
>>>>                     if (isset($command['tuple'])) {
>>>>                         $tupleMap = array_merge(array(
>>>>                                 'id' => null,
>>>>                                 'comp' => null,
>>>>                                 'stream' => null,
>>>>                                 'task' => null,
>>>>                                 'tuple' => null
>>>>                             ),
>>>>
>>>>                             $command);
>>>>
>>>>                         if($tupleMap['task'] == -1 &&
>>>> $tupleMap['stream'] == "__heartbeat") {
>>>>                             $this->sync();
>>>>                         } else {
>>>>                             $tuple = new Tuple($tupleMap['id'],
>>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>>> $tupleMap['tuple']);
>>>>
>>>>                             $this->anchor_tuple = $tuple;
>>>>
>>>>                             try {
>>>>                                 $processed = $this->process($tuple);
>>>>
>>>>                                 $this->ack($tuple);
>>>>                             } catch (BoltProcessException $e) {
>>>>                                 $this->fail($tuple);
>>>>                             }
>>>>                         }
>>>>                     }
>>>>                 }
>>>>             }
>>>>         } catch (Exception $e) {
>>>>             $this->sendLog((string)$e);
>>>>         }
>>>>
>>>>     }
>>>> }
>>>>
>>>> abstract class ShellSpout extends ShellComponent implements iShellSpout
>>>> {
>>>>     protected $tuples = array();
>>>>
>>>>     public function __construct() {
>>>>         parent::__construct();
>>>>
>>>>         $this->init($this->stormConf, $this->topologyContext);
>>>>     }
>>>>
>>>>
>>>>     abstract protected function nextTuple();
>>>>
>>>>     abstract protected function ack($tuple_id);
>>>>
>>>>     abstract protected function fail($tuple_id);
>>>>
>>>>     public function run() {
>>>>         try {
>>>>             while (true) {
>>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>>
>>>>                 if (is_array($command)) {
>>>>                     if (isset($command['command'])) {
>>>>                         if ($command['command'] == 'ack') {
>>>>                             $this->ack($command['id']);
>>>>                             $this->sync();
>>>>                         } else if ($command['command'] == 'fail') {
>>>>                             $this->fail($command['id']);
>>>>                             $this->sync();
>>>>                         } else if ($command['command'] == 'next') {
>>>>                             $this->nextTuple();
>>>>                             $this->sync();
>>>>                         }
>>>>                     }
>>>>                 }
>>>>             }
>>>>         } catch (Exception $e) {
>>>>             $this->sendLog((string)$e);
>>>>             $this->sync();
>>>>         }
>>>>     }
>>>>
>>>>     protected function init($stormConf, $topologyContext) {
>>>>         return;
>>>>     }
>>>>
>>>>     final protected function emit(array $tuple, $messageId = null,
>>>> $streamId = null) {
>>>>         return $this->emitTuple($tuple, $messageId, $streamId, null);
>>>>     }
>>>>
>>>>     final protected function emitDirect($directTask, array $tuple,
>>>> $messageId = null, $streamId = null) {
>>>>         return $this->emitTuple($tuple, $messageId, $streamId,
>>>> $directTask);
>>>>     }
>>>>
>>>>     final private function emitTuple(array $tuple, $messageId = null,
>>>> $streamId = null, $directTask = null) {
>>>>         $command = array(
>>>>             'command' => 'emit'
>>>>         );
>>>>
>>>>         if ($messageId !== null) {
>>>>             $command['id'] = $messageId;
>>>>         }
>>>>
>>>>         if ($streamId !== null) {
>>>>             $command['stream'] = $streamId;
>>>>         }
>>>>
>>>>         if ($directTask !== null) {
>>>>             $command['task'] = $directTask;
>>>>         }
>>>>
>>>>         $command['tuple'] = $tuple;
>>>>
>>>>         return $this->sendCommand($command);
>>>>     }
>>>> }
>>>>
>>>> class BoltProcessException extends Exception {
>>>> }
>>>>
>>>> =========================
>>>>
>>>>
>>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>>>>
>>>>> I'll try to cover the important facts that led to this issue:
>>>>>
>>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some
>>>>> existing business logic
>>>>>
>>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition
>>>>> to the ShellBolt protocol)
>>>>>
>>>>> -I have some odd topologies where I try to do some legacy background
>>>>> processing.  This processing takes a highly variable amount time in the
>>>>> Bolts, from milliseconds to minutes.  But, eventually due to randomness 
>>>>> the
>>>>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>>>>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>>>>> increase the heartbeat timeout at the topology level. that's not the
>>>>> purpose of this email, though confirmation of this as my only workaround
>>>>> would be appreciated!  I feel like this wasn't anticipated when the
>>>>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>>>>> wouldn't block I guess?)
>>>>>
>>>>> -The purpose of this email is the fact that the topology "jams up"
>>>>> when the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt
>>>>> still running (I added logging to them), but Storm itself is doing 
>>>>> nothing.
>>>>>
>>>>> -I added logging to ShellSpout and recompiled, because I saw the log
>>>>> message on like 233 (Halting process: ShellSpout died) but as noted the 
>>>>> PHP
>>>>> process was still running, so I was curious if _process.destroy(); failed.
>>>>> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
>>>>> Eventually I commented out line 234: _collector.reportError(exception);
>>>>>  and everything started working!!!
>>>>>
>>>>> Does this make *any* sense?  Why would
>>>>> _collector.reportError(exception); block and never return (I waited quite 
>>>>> a
>>>>> long time, 10's of minutes).  When I comment out line 234, Storm
>>>>> immediately kills my bad tasks and respawns almost instantly.
>>>>>
>>>>> I feel fairly confident that this will be recreatable.  My topology:
>>>>> -1 spout (ShellSpout)
>>>>> -1 bolt (ShellBolt)
>>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt
>>>>> + the pending queue is full
>>>>>
>>>>> Thanks for any feedback!
>>>>>
>>>>> will
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Alex Sobrino Beltrán
>>> Registered Linux User #273657
>>>
>>> http://v5tech.es
>>>
>>
>>
>>
>>
>
>

Reply via email to