I'm not sure what I've learned adds up to yet....

I tried setting up a local storm development environment.  By mistake, I
forgot to switch to 0.9.3-branch (e.g. I was working on master at first).
In master, I was seeing the ShellBolt heartbeat fail first (which in
retrospect makes sense!).  Then I remembered I was trying to debug 0.9.3,
so I switched to 0.9.3-branch.  In the branch, I see ShellSpout heartbeat
fail first and jam up, so that's good (?).  In both cases, I was trying
local mode rather than cluster mode, as I figured I had a better shot at
debugging in local mode.

At this point, I figured I had a good test case.  This was all using
command line tools (git, mvn) so far.   Two thoughts at this point:  1.) I
don't plan on running master, and it's not 100% clear to me if ShellBolt
failing first will solve any problems...  2.) in 0.9.3-branch, after
ShellSpout fails and everything jams up if I "ctl-c" the process I
immediately see the ShellBolt heartbeat timeout message.  It's like it was
waiting to write, but was blocked on something (hrmmmm....?)

But, I hit the limits of my ability to add "System.out" debugging, so I
tried setting up storm in IntelliJ.  That took a bit, but I finally figured
out how to run a topology in local mode with a debugger.  Once again, I was
seeing ShellSpout heartbeat fail and then nothing happen.

The next problem is that I don't know how to setup IntelliJ to understand
clojure compiled code (at least, I think that's my problem....) so the
"Step Into/Out of" information is really weird in the debugger.  The
best/most complete stack trace I have is:
invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm)
invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm)
invoke():114, zookeeper$mkdirs (backtype.storm)
mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526
(backtype.storm)
report_error():397, cluster$mk_storm_cluster_state$reify__3983
(backtype.storm)
invoke():180, executor$throttled_report_error_fn$fn__5548
(backtype.storm.daemon)
reportError():533, executor$fn__5700$fn$reify__5742 (backtype.storm.daemon)
reportError():132, SpoutOutputCollector (backtype.storm.spout)

I had a better stack trace (that I lost) that lead into:
org.apache.storm.curator.RetryLoop.callWithRetry()
which for me is my "prime suspect" (based on name alone) for something that
is blocking things up....  :-)

Though, once again, not understanding the big picture of storm, I have no
idea what all of the above adds up to in terms of what's wrong, and how to
fix it still....

will

On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <[email protected]>
wrote:

> I'm glad to hear I'm not the only one!
>
> (no new news yet)
>
>
> On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <[email protected]> wrote:
>
>> Hi William,
>>
>> I'm having the same problem running a multilang topology (written in
>> python). If you find a solution, please post it here, it will sure help us.
>>
>> To upgrade from 0.9.2-incubating we updated storm.py (
>> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
>> and pom.xml.
>>
>> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
>> works like hell.
>>
>> Best regards,
>>
>> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <
>> [email protected]> wrote:
>>
>>> I'm not sure the best way to share a test case.  I'll copy and paste
>>> code below....  If you run the below code (and find the worker that was
>>> running it's log file), you should see in ~30 seconds:
>>> ====
>>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
>>> ShellSpout died.
>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>         at
>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>> [storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> [na:1.7.0_71]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>         at
>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>>> [storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> [na:1.7.0_71]
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> [na:1.7.0_71]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>>> =======
>>>
>>> But, the topology will run in a kind of weird zombie state forever.
>>> More specifically I see the multilang bolt process all tuples in the
>>> pending queue, and then an infinite loop of nextTuple()/fail() from the
>>> multilang spout.  But, as noted in my original email, if I comment out:
>>>  _collector.reportError(exception);
>>> in the Java ShellSpout then the worker will immediately die and respawn.
>>>
>>> If no one can help, the next step for me is rough, as I'll have to learn
>>> how to actually develop and debug storm itself, which is usually at least
>>> 10x harder than just using something :-)
>>>
>>> In any case, my test code:
>>>
>>> Topology = 1 process with two tasks (multilang spout and bolt), and
>>> small pool of pending messages (yes, using the word count example in
>>> storm-starter as a starting point....)
>>> =============
>>> public class SlowTopology {
>>>   public static class SlowPhpBolt extends ShellBolt implements IRichBolt
>>> {
>>>
>>>     public SlowPhpBolt() {
>>>       super("php", "slowBolt.php");
>>>     }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>       declarer.declare(new Fields());
>>>     }
>>>
>>>     @Override
>>>     public Map<String, Object> getComponentConfiguration() {
>>>       return null;
>>>     }
>>>   }
>>>
>>>   public static class SlowPhpSpout extends ShellSpout implements
>>> IRichSpout {
>>>
>>>       public SlowPhpSpout() {
>>>           super("php", "slowSpout.php");
>>>       }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>         ofd.declare(new Fields("output"));
>>>     }
>>>
>>>     @Override
>>>     public Map<String, Object> getComponentConfiguration() {
>>>         return null;
>>>     }
>>>   }
>>>
>>>   public static void main(String[] args) throws Exception {
>>>
>>>     TopologyBuilder builder = new TopologyBuilder();
>>>
>>>     builder.setSpout("spout", new SlowPhpSpout(),
>>> 1).setNumTasks(1).setMaxSpoutPending(3);
>>>     builder.setBolt("bolt", new SlowPhpBolt(),
>>> 1).setNumTasks(1).shuffleGrouping("spout");
>>>
>>>     Config conf = new Config();
>>>     conf.setDebug(true);
>>>
>>>     if (args != null && args.length > 0) {
>>>       conf.setNumWorkers(1);
>>>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
>>> builder.createTopology());
>>>     }
>>>     else {
>>>       LocalCluster cluster = new LocalCluster();
>>>       cluster.submitTopology("slow", conf, builder.createTopology());
>>>       Thread.sleep(10000);
>>>       cluster.shutdown();
>>>     }
>>>   }
>>> }
>>> ===========
>>>
>>> slowSpout.php
>>> ==========
>>> <?php
>>> require_once "storm.php";
>>> class slowSpout extends \ShellSpout {
>>>   protected function nextTuple() {
>>>     $value = rand(0,100);
>>>     $id = rand(0, 100);
>>>     $this->emit(array($value), $id);
>>>     file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value]
>>> id[$id]\n", FILE_APPEND);
>>>     sleep(1);
>>>   }
>>>   protected function ack($id) {
>>>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND);
>>>   }
>>>
>>>   protected function fail($id) {
>>>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND);
>>>   }
>>> }
>>>
>>> (new slowSpout())->run();
>>> ===========
>>>
>>> slowBolt.php
>>> ============
>>> <?php
>>> require_once "storm.php";
>>> class slowBolt extends \BasicBolt {
>>>   protected function process(\Tuple $t) {
>>>     $sleep = rand(1, 180);
>>>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
>>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>>>     sleep($sleep);
>>>   }
>>> }
>>> (new slowBolt())->run();
>>> ============
>>>
>>> storm.php  (from  https://github.com/lazyshot/storm-php, and I think I
>>> added more error checking on reads/writes to standard in/out, added sync()
>>> to the ShellSpout to make new classes easier to write, and the new
>>> heartbeat protocol)
>>> =========
>>> <?php
>>> interface iShellBolt {
>>> }
>>>
>>> interface iShellSpout {
>>> }
>>>
>>> class Tuple {
>>>     public $id, $component, $stream, $task, $values;
>>>
>>>     public function __construct($id, $component, $stream, $task,
>>> $values) {
>>>         $this->id = $id;
>>>         $this->component = $component;
>>>         $this->stream = $stream;
>>>         $this->task = $task;
>>>         $this->values = $values;
>>>     }
>>> }
>>>
>>> abstract class ShellComponent {
>>>     protected $pid;
>>>     protected $stormConf;
>>>     protected $topologyContext;
>>>
>>>     protected $stormInc = null;
>>>
>>>     public function __construct() {
>>>         $this->pid = getmypid();
>>>         $this->sendCommand(array("pid" => $this->pid));
>>>
>>>         $handshake = $this->parseMessage($this->waitForMessage());
>>>
>>>         $this->stormConf = $handshake['conf'];
>>>         $this->topologyContext = $handshake['context'];
>>>         $pidDir = $handshake['pidDir'];
>>>
>>>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>>>     }
>>>
>>>     protected function readLine() {
>>>         $raw = fgets(STDIN);
>>>
>>>         if ($raw === false) {
>>>             throw new Exception("STDIN is broken");
>>>         }
>>>
>>>         $line = trim($raw);
>>>
>>>         return $line;
>>>     }
>>>
>>>     protected function waitForMessage() {
>>>         $message = '';
>>>         while (true) {
>>>             $line = trim($this->readLine());
>>>
>>>             if (strlen($line) == 0) {
>>>                 continue;
>>>             } else if ($line == 'end') {
>>>                 break;
>>>             } else if ($line == 'sync') {
>>>                 $message = '';
>>>                 continue;
>>>             }
>>>
>>>             $message .= $line . "\n";
>>>         }
>>>
>>>         return trim($message);
>>>     }
>>>
>>>     protected function sendCommand(array $command) {
>>>         $this->sendMessage(json_encode($command));
>>>     }
>>>
>>>     protected function sendLog($message) {
>>>         return $this->sendCommand(array(
>>>             'command' => 'log',
>>>             'msg' => $message
>>>         ));
>>>     }
>>>
>>>     protected function parseMessage($message) {
>>>         $msg = json_decode($message, true);
>>>
>>>         if ($msg) {
>>>             return $msg;
>>>         } else {
>>>             return $message;
>>>         }
>>>     }
>>>
>>>     protected function sendMessage($message) {
>>>         $message = "$message\nend\n";
>>>         $bytesWritten = fwrite(STDOUT, $message);
>>>         fflush(STDOUT);
>>>         if ($bytesWritten === false) {
>>>             throw new Exception("STDOUT is broken");
>>>         }
>>>         if ($bytesWritten != strlen($message)) {
>>>             throw new Exception("Unable to write all bytes to STDOUT
>>> (message=$message, bytesWritten=$bytesWritten)");
>>>         }
>>>     }
>>>
>>>     final protected function sync() {
>>>         $command = array(
>>>             'command' => 'sync',
>>>         );
>>>
>>>         $this->sendCommand($command);
>>>     }
>>>
>>> }
>>>
>>> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>>>
>>>     public $anchor_tuple = null;
>>>
>>>     public function __construct() {
>>>         parent::__construct();
>>>
>>>         $this->init($this->stormConf, $this->topologyContext);
>>>     }
>>>
>>>     public function run() {
>>>         try {
>>>             while (true) {
>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>
>>>                 if (is_array($command)) {
>>>                     if (isset($command['tuple'])) {
>>>                         $tupleMap = array_merge(array(
>>>                                 'id' => null,
>>>                                 'comp' => null,
>>>                                 'stream' => null,
>>>                                 'task' => null,
>>>                                 'tuple' => null
>>>                             ),
>>>
>>>                             $command);
>>>
>>>                         if($tupleMap['task'] == -1 &&
>>> $tupleMap['stream'] == "__heartbeat") {
>>>                             $this->sync();
>>>                         } else {
>>>                             $tuple = new Tuple($tupleMap['id'],
>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>> $tupleMap['tuple']);
>>>                             $this->process($tuple);
>>>                         }
>>>                     }
>>>                 }
>>>             }
>>>         } catch (Exception $e) {
>>>             $this->sendLog((string)$e);
>>>         }
>>>     }
>>>
>>>     abstract protected function process(Tuple $tuple);
>>>
>>>     protected function init($conf, $topology) {
>>>         return;
>>>     }
>>>
>>>     protected function emitTuple(array $tuple, $stream = null, $anchors
>>> = array(), $directTask = null) {
>>>         if ($this->anchor_tuple !== null) {
>>>             $anchors = array($this->anchor_tuple);
>>>         }
>>>
>>>         $command = array(
>>>             'command' => 'emit'
>>>         );
>>>
>>>         if ($stream !== null) {
>>>             $command['stream'] = $stream;
>>>         }
>>>
>>>         $command['anchors'] = array_map(function ($a) {
>>>             return $a->id;
>>>         }, $anchors);
>>>
>>>         if ($directTask !== null) {
>>>             $command['task'] = $directTask;
>>>         }
>>>
>>>         $command['tuple'] = $tuple;
>>>
>>>         $this->sendCommand($command);
>>>     }
>>>
>>>     protected function emit($tuple, $stream = null, $anchors = array()) {
>>>         $this->emitTuple($tuple, $stream, $anchors);
>>>     }
>>>
>>>     protected function emitDirect($directTask, $tuple, $stream = null,
>>> $anchors = array()) {
>>>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
>>>     }
>>>
>>>     protected function ack(Tuple $tuple) {
>>>         $command = array(
>>>             'command' => 'ack',
>>>             'id' => $tuple->id
>>>         );
>>>
>>>         $this->sendCommand($command);
>>>     }
>>>
>>>     protected function fail(Tuple $tuple) {
>>>         $command = array(
>>>             'command' => 'fail',
>>>             'id' => $tuple->id
>>>         );
>>>
>>>         $this->sendCommand($command);
>>>     }
>>> }
>>>
>>> abstract class BasicBolt extends ShellBolt {
>>>     public function run() {
>>>         try {
>>>             while (true) {
>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>
>>>                 if (is_array($command)) {
>>>                     if (isset($command['tuple'])) {
>>>                         $tupleMap = array_merge(array(
>>>                                 'id' => null,
>>>                                 'comp' => null,
>>>                                 'stream' => null,
>>>                                 'task' => null,
>>>                                 'tuple' => null
>>>                             ),
>>>
>>>                             $command);
>>>
>>>                         if($tupleMap['task'] == -1 &&
>>> $tupleMap['stream'] == "__heartbeat") {
>>>                             $this->sync();
>>>                         } else {
>>>                             $tuple = new Tuple($tupleMap['id'],
>>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>>> $tupleMap['tuple']);
>>>
>>>                             $this->anchor_tuple = $tuple;
>>>
>>>                             try {
>>>                                 $processed = $this->process($tuple);
>>>
>>>                                 $this->ack($tuple);
>>>                             } catch (BoltProcessException $e) {
>>>                                 $this->fail($tuple);
>>>                             }
>>>                         }
>>>                     }
>>>                 }
>>>             }
>>>         } catch (Exception $e) {
>>>             $this->sendLog((string)$e);
>>>         }
>>>
>>>     }
>>> }
>>>
>>> abstract class ShellSpout extends ShellComponent implements iShellSpout {
>>>     protected $tuples = array();
>>>
>>>     public function __construct() {
>>>         parent::__construct();
>>>
>>>         $this->init($this->stormConf, $this->topologyContext);
>>>     }
>>>
>>>
>>>     abstract protected function nextTuple();
>>>
>>>     abstract protected function ack($tuple_id);
>>>
>>>     abstract protected function fail($tuple_id);
>>>
>>>     public function run() {
>>>         try {
>>>             while (true) {
>>>                 $command = $this->parseMessage($this->waitForMessage());
>>>
>>>                 if (is_array($command)) {
>>>                     if (isset($command['command'])) {
>>>                         if ($command['command'] == 'ack') {
>>>                             $this->ack($command['id']);
>>>                             $this->sync();
>>>                         } else if ($command['command'] == 'fail') {
>>>                             $this->fail($command['id']);
>>>                             $this->sync();
>>>                         } else if ($command['command'] == 'next') {
>>>                             $this->nextTuple();
>>>                             $this->sync();
>>>                         }
>>>                     }
>>>                 }
>>>             }
>>>         } catch (Exception $e) {
>>>             $this->sendLog((string)$e);
>>>             $this->sync();
>>>         }
>>>     }
>>>
>>>     protected function init($stormConf, $topologyContext) {
>>>         return;
>>>     }
>>>
>>>     final protected function emit(array $tuple, $messageId = null,
>>> $streamId = null) {
>>>         return $this->emitTuple($tuple, $messageId, $streamId, null);
>>>     }
>>>
>>>     final protected function emitDirect($directTask, array $tuple,
>>> $messageId = null, $streamId = null) {
>>>         return $this->emitTuple($tuple, $messageId, $streamId,
>>> $directTask);
>>>     }
>>>
>>>     final private function emitTuple(array $tuple, $messageId = null,
>>> $streamId = null, $directTask = null) {
>>>         $command = array(
>>>             'command' => 'emit'
>>>         );
>>>
>>>         if ($messageId !== null) {
>>>             $command['id'] = $messageId;
>>>         }
>>>
>>>         if ($streamId !== null) {
>>>             $command['stream'] = $streamId;
>>>         }
>>>
>>>         if ($directTask !== null) {
>>>             $command['task'] = $directTask;
>>>         }
>>>
>>>         $command['tuple'] = $tuple;
>>>
>>>         return $this->sendCommand($command);
>>>     }
>>> }
>>>
>>> class BoltProcessException extends Exception {
>>> }
>>>
>>> =========================
>>>
>>>
>>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>>>
>>>> I'll try to cover the important facts that led to this issue:
>>>>
>>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some
>>>> existing business logic
>>>>
>>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition
>>>> to the ShellBolt protocol)
>>>>
>>>> -I have some odd topologies where I try to do some legacy background
>>>> processing.  This processing takes a highly variable amount time in the
>>>> Bolts, from milliseconds to minutes.  But, eventually due to randomness the
>>>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>>>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>>>> increase the heartbeat timeout at the topology level. that's not the
>>>> purpose of this email, though confirmation of this as my only workaround
>>>> would be appreciated!  I feel like this wasn't anticipated when the
>>>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>>>> wouldn't block I guess?)
>>>>
>>>> -The purpose of this email is the fact that the topology "jams up" when
>>>> the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt still
>>>> running (I added logging to them), but Storm itself is doing nothing.
>>>>
>>>> -I added logging to ShellSpout and recompiled, because I saw the log
>>>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP
>>>> process was still running, so I was curious if _process.destroy(); failed.
>>>> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
>>>> Eventually I commented out line 234: _collector.reportError(exception);
>>>>  and everything started working!!!
>>>>
>>>> Does this make *any* sense?  Why would
>>>> _collector.reportError(exception); block and never return (I waited quite a
>>>> long time, 10's of minutes).  When I comment out line 234, Storm
>>>> immediately kills my bad tasks and respawns almost instantly.
>>>>
>>>> I feel fairly confident that this will be recreatable.  My topology:
>>>> -1 spout (ShellSpout)
>>>> -1 bolt (ShellBolt)
>>>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt
>>>> + the pending queue is full
>>>>
>>>> Thanks for any feedback!
>>>>
>>>> will
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>>
>> --
>> Alex Sobrino Beltrán
>> Registered Linux User #273657
>>
>> http://v5tech.es
>>
>
>
>
>

Reply via email to