I'm not sure what I've learned adds up to yet.... I tried setting up a local storm development environment. By mistake, I forgot to switch to 0.9.3-branch (e.g. I was working on master at first). In master, I was seeing the ShellBolt heartbeat fail first (which in retrospect makes sense!). Then I remembered I was trying to debug 0.9.3, so I switched to 0.9.3-branch. In the branch, I see ShellSpout heartbeat fail first and jam up, so that's good (?). In both cases, I was trying local mode rather than cluster mode, as I figured I had a better shot at debugging in local mode.
At this point, I figured I had a good test case. This was all using command line tools (git, mvn) so far. Two thoughts at this point: 1.) I don't plan on running master, and it's not 100% clear to me if ShellBolt failing first will solve any problems... 2.) in 0.9.3-branch, after ShellSpout fails and everything jams up if I "ctl-c" the process I immediately see the ShellBolt heartbeat timeout message. It's like it was waiting to write, but was blocked on something (hrmmmm....?) But, I hit the limits of my ability to add "System.out" debugging, so I tried setting up storm in IntelliJ. That took a bit, but I finally figured out how to run a topology in local mode with a debugger. Once again, I was seeing ShellSpout heartbeat fail and then nothing happen. The next problem is that I don't know how to setup IntelliJ to understand clojure compiled code (at least, I think that's my problem....) so the "Step Into/Out of" information is really weird in the debugger. The best/most complete stack trace I have is: invoke():102, zookeeper$exists_node_QMARK_$fn__3279 (backtype.storm) invoke():98, zookeeper$exists_node_QMARK_ (backtype.storm) invoke():114, zookeeper$mkdirs (backtype.storm) mkdirs():119, cluster$mk_distributed_cluster_state$reify__3526 (backtype.storm) report_error():397, cluster$mk_storm_cluster_state$reify__3983 (backtype.storm) invoke():180, executor$throttled_report_error_fn$fn__5548 (backtype.storm.daemon) reportError():533, executor$fn__5700$fn$reify__5742 (backtype.storm.daemon) reportError():132, SpoutOutputCollector (backtype.storm.spout) I had a better stack trace (that I lost) that lead into: org.apache.storm.curator.RetryLoop.callWithRetry() which for me is my "prime suspect" (based on name alone) for something that is blocking things up.... :-) Though, once again, not understanding the big picture of storm, I have no idea what all of the above adds up to in terms of what's wrong, and how to fix it still.... will On Wed, Feb 11, 2015 at 1:31 PM, William Oberman <[email protected]> wrote: > I'm glad to hear I'm not the only one! > > (no new news yet) > > > On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <[email protected]> wrote: > >> Hi William, >> >> I'm having the same problem running a multilang topology (written in >> python). If you find a solution, please post it here, it will sure help us. >> >> To upgrade from 0.9.2-incubating we updated storm.py ( >> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py) >> and pom.xml. >> >> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it >> works like hell. >> >> Best regards, >> >> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman < >> [email protected]> wrote: >> >>> I'm not sure the best way to share a test case. I'll copy and paste >>> code below.... If you run the below code (and find the worker that was >>> running it's log file), you should see in ~30 seconds: >>> ==== >>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process: >>> ShellSpout died. >>> java.lang.RuntimeException: subprocess heartbeat timeout >>> at >>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) >>> [storm-core-0.9.3.jar:0.9.3] >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>> [na:1.7.0_71] >>> at >>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) >>> [na:1.7.0_71] >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) >>> [na:1.7.0_71] >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>> [na:1.7.0_71] >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> [na:1.7.0_71] >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> [na:1.7.0_71] >>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] >>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR] >>> java.lang.RuntimeException: subprocess heartbeat timeout >>> at >>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) >>> [storm-core-0.9.3.jar:0.9.3] >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>> [na:1.7.0_71] >>> at >>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) >>> [na:1.7.0_71] >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) >>> [na:1.7.0_71] >>> at >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>> [na:1.7.0_71] >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> [na:1.7.0_71] >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> [na:1.7.0_71] >>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] >>> ======= >>> >>> But, the topology will run in a kind of weird zombie state forever. >>> More specifically I see the multilang bolt process all tuples in the >>> pending queue, and then an infinite loop of nextTuple()/fail() from the >>> multilang spout. But, as noted in my original email, if I comment out: >>> _collector.reportError(exception); >>> in the Java ShellSpout then the worker will immediately die and respawn. >>> >>> If no one can help, the next step for me is rough, as I'll have to learn >>> how to actually develop and debug storm itself, which is usually at least >>> 10x harder than just using something :-) >>> >>> In any case, my test code: >>> >>> Topology = 1 process with two tasks (multilang spout and bolt), and >>> small pool of pending messages (yes, using the word count example in >>> storm-starter as a starting point....) >>> ============= >>> public class SlowTopology { >>> public static class SlowPhpBolt extends ShellBolt implements IRichBolt >>> { >>> >>> public SlowPhpBolt() { >>> super("php", "slowBolt.php"); >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> declarer.declare(new Fields()); >>> } >>> >>> @Override >>> public Map<String, Object> getComponentConfiguration() { >>> return null; >>> } >>> } >>> >>> public static class SlowPhpSpout extends ShellSpout implements >>> IRichSpout { >>> >>> public SlowPhpSpout() { >>> super("php", "slowSpout.php"); >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer ofd) { >>> ofd.declare(new Fields("output")); >>> } >>> >>> @Override >>> public Map<String, Object> getComponentConfiguration() { >>> return null; >>> } >>> } >>> >>> public static void main(String[] args) throws Exception { >>> >>> TopologyBuilder builder = new TopologyBuilder(); >>> >>> builder.setSpout("spout", new SlowPhpSpout(), >>> 1).setNumTasks(1).setMaxSpoutPending(3); >>> builder.setBolt("bolt", new SlowPhpBolt(), >>> 1).setNumTasks(1).shuffleGrouping("spout"); >>> >>> Config conf = new Config(); >>> conf.setDebug(true); >>> >>> if (args != null && args.length > 0) { >>> conf.setNumWorkers(1); >>> StormSubmitter.submitTopologyWithProgressBar(args[0], conf, >>> builder.createTopology()); >>> } >>> else { >>> LocalCluster cluster = new LocalCluster(); >>> cluster.submitTopology("slow", conf, builder.createTopology()); >>> Thread.sleep(10000); >>> cluster.shutdown(); >>> } >>> } >>> } >>> =========== >>> >>> slowSpout.php >>> ========== >>> <?php >>> require_once "storm.php"; >>> class slowSpout extends \ShellSpout { >>> protected function nextTuple() { >>> $value = rand(0,100); >>> $id = rand(0, 100); >>> $this->emit(array($value), $id); >>> file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value] >>> id[$id]\n", FILE_APPEND); >>> sleep(1); >>> } >>> protected function ack($id) { >>> file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND); >>> } >>> >>> protected function fail($id) { >>> file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND); >>> } >>> } >>> >>> (new slowSpout())->run(); >>> =========== >>> >>> slowBolt.php >>> ============ >>> <?php >>> require_once "storm.php"; >>> class slowBolt extends \BasicBolt { >>> protected function process(\Tuple $t) { >>> $sleep = rand(1, 180); >>> file_put_contents("/tmp/storm_slow.log", "process(".print_r($t, >>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND); >>> sleep($sleep); >>> } >>> } >>> (new slowBolt())->run(); >>> ============ >>> >>> storm.php (from https://github.com/lazyshot/storm-php, and I think I >>> added more error checking on reads/writes to standard in/out, added sync() >>> to the ShellSpout to make new classes easier to write, and the new >>> heartbeat protocol) >>> ========= >>> <?php >>> interface iShellBolt { >>> } >>> >>> interface iShellSpout { >>> } >>> >>> class Tuple { >>> public $id, $component, $stream, $task, $values; >>> >>> public function __construct($id, $component, $stream, $task, >>> $values) { >>> $this->id = $id; >>> $this->component = $component; >>> $this->stream = $stream; >>> $this->task = $task; >>> $this->values = $values; >>> } >>> } >>> >>> abstract class ShellComponent { >>> protected $pid; >>> protected $stormConf; >>> protected $topologyContext; >>> >>> protected $stormInc = null; >>> >>> public function __construct() { >>> $this->pid = getmypid(); >>> $this->sendCommand(array("pid" => $this->pid)); >>> >>> $handshake = $this->parseMessage($this->waitForMessage()); >>> >>> $this->stormConf = $handshake['conf']; >>> $this->topologyContext = $handshake['context']; >>> $pidDir = $handshake['pidDir']; >>> >>> @fclose(@fopen($pidDir . "/" . $this->pid, "w")); >>> } >>> >>> protected function readLine() { >>> $raw = fgets(STDIN); >>> >>> if ($raw === false) { >>> throw new Exception("STDIN is broken"); >>> } >>> >>> $line = trim($raw); >>> >>> return $line; >>> } >>> >>> protected function waitForMessage() { >>> $message = ''; >>> while (true) { >>> $line = trim($this->readLine()); >>> >>> if (strlen($line) == 0) { >>> continue; >>> } else if ($line == 'end') { >>> break; >>> } else if ($line == 'sync') { >>> $message = ''; >>> continue; >>> } >>> >>> $message .= $line . "\n"; >>> } >>> >>> return trim($message); >>> } >>> >>> protected function sendCommand(array $command) { >>> $this->sendMessage(json_encode($command)); >>> } >>> >>> protected function sendLog($message) { >>> return $this->sendCommand(array( >>> 'command' => 'log', >>> 'msg' => $message >>> )); >>> } >>> >>> protected function parseMessage($message) { >>> $msg = json_decode($message, true); >>> >>> if ($msg) { >>> return $msg; >>> } else { >>> return $message; >>> } >>> } >>> >>> protected function sendMessage($message) { >>> $message = "$message\nend\n"; >>> $bytesWritten = fwrite(STDOUT, $message); >>> fflush(STDOUT); >>> if ($bytesWritten === false) { >>> throw new Exception("STDOUT is broken"); >>> } >>> if ($bytesWritten != strlen($message)) { >>> throw new Exception("Unable to write all bytes to STDOUT >>> (message=$message, bytesWritten=$bytesWritten)"); >>> } >>> } >>> >>> final protected function sync() { >>> $command = array( >>> 'command' => 'sync', >>> ); >>> >>> $this->sendCommand($command); >>> } >>> >>> } >>> >>> abstract class ShellBolt extends ShellComponent implements iShellBolt { >>> >>> public $anchor_tuple = null; >>> >>> public function __construct() { >>> parent::__construct(); >>> >>> $this->init($this->stormConf, $this->topologyContext); >>> } >>> >>> public function run() { >>> try { >>> while (true) { >>> $command = $this->parseMessage($this->waitForMessage()); >>> >>> if (is_array($command)) { >>> if (isset($command['tuple'])) { >>> $tupleMap = array_merge(array( >>> 'id' => null, >>> 'comp' => null, >>> 'stream' => null, >>> 'task' => null, >>> 'tuple' => null >>> ), >>> >>> $command); >>> >>> if($tupleMap['task'] == -1 && >>> $tupleMap['stream'] == "__heartbeat") { >>> $this->sync(); >>> } else { >>> $tuple = new Tuple($tupleMap['id'], >>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], >>> $tupleMap['tuple']); >>> $this->process($tuple); >>> } >>> } >>> } >>> } >>> } catch (Exception $e) { >>> $this->sendLog((string)$e); >>> } >>> } >>> >>> abstract protected function process(Tuple $tuple); >>> >>> protected function init($conf, $topology) { >>> return; >>> } >>> >>> protected function emitTuple(array $tuple, $stream = null, $anchors >>> = array(), $directTask = null) { >>> if ($this->anchor_tuple !== null) { >>> $anchors = array($this->anchor_tuple); >>> } >>> >>> $command = array( >>> 'command' => 'emit' >>> ); >>> >>> if ($stream !== null) { >>> $command['stream'] = $stream; >>> } >>> >>> $command['anchors'] = array_map(function ($a) { >>> return $a->id; >>> }, $anchors); >>> >>> if ($directTask !== null) { >>> $command['task'] = $directTask; >>> } >>> >>> $command['tuple'] = $tuple; >>> >>> $this->sendCommand($command); >>> } >>> >>> protected function emit($tuple, $stream = null, $anchors = array()) { >>> $this->emitTuple($tuple, $stream, $anchors); >>> } >>> >>> protected function emitDirect($directTask, $tuple, $stream = null, >>> $anchors = array()) { >>> $this->emitTuple($tuple, $stream, $anchors, $directTask); >>> } >>> >>> protected function ack(Tuple $tuple) { >>> $command = array( >>> 'command' => 'ack', >>> 'id' => $tuple->id >>> ); >>> >>> $this->sendCommand($command); >>> } >>> >>> protected function fail(Tuple $tuple) { >>> $command = array( >>> 'command' => 'fail', >>> 'id' => $tuple->id >>> ); >>> >>> $this->sendCommand($command); >>> } >>> } >>> >>> abstract class BasicBolt extends ShellBolt { >>> public function run() { >>> try { >>> while (true) { >>> $command = $this->parseMessage($this->waitForMessage()); >>> >>> if (is_array($command)) { >>> if (isset($command['tuple'])) { >>> $tupleMap = array_merge(array( >>> 'id' => null, >>> 'comp' => null, >>> 'stream' => null, >>> 'task' => null, >>> 'tuple' => null >>> ), >>> >>> $command); >>> >>> if($tupleMap['task'] == -1 && >>> $tupleMap['stream'] == "__heartbeat") { >>> $this->sync(); >>> } else { >>> $tuple = new Tuple($tupleMap['id'], >>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], >>> $tupleMap['tuple']); >>> >>> $this->anchor_tuple = $tuple; >>> >>> try { >>> $processed = $this->process($tuple); >>> >>> $this->ack($tuple); >>> } catch (BoltProcessException $e) { >>> $this->fail($tuple); >>> } >>> } >>> } >>> } >>> } >>> } catch (Exception $e) { >>> $this->sendLog((string)$e); >>> } >>> >>> } >>> } >>> >>> abstract class ShellSpout extends ShellComponent implements iShellSpout { >>> protected $tuples = array(); >>> >>> public function __construct() { >>> parent::__construct(); >>> >>> $this->init($this->stormConf, $this->topologyContext); >>> } >>> >>> >>> abstract protected function nextTuple(); >>> >>> abstract protected function ack($tuple_id); >>> >>> abstract protected function fail($tuple_id); >>> >>> public function run() { >>> try { >>> while (true) { >>> $command = $this->parseMessage($this->waitForMessage()); >>> >>> if (is_array($command)) { >>> if (isset($command['command'])) { >>> if ($command['command'] == 'ack') { >>> $this->ack($command['id']); >>> $this->sync(); >>> } else if ($command['command'] == 'fail') { >>> $this->fail($command['id']); >>> $this->sync(); >>> } else if ($command['command'] == 'next') { >>> $this->nextTuple(); >>> $this->sync(); >>> } >>> } >>> } >>> } >>> } catch (Exception $e) { >>> $this->sendLog((string)$e); >>> $this->sync(); >>> } >>> } >>> >>> protected function init($stormConf, $topologyContext) { >>> return; >>> } >>> >>> final protected function emit(array $tuple, $messageId = null, >>> $streamId = null) { >>> return $this->emitTuple($tuple, $messageId, $streamId, null); >>> } >>> >>> final protected function emitDirect($directTask, array $tuple, >>> $messageId = null, $streamId = null) { >>> return $this->emitTuple($tuple, $messageId, $streamId, >>> $directTask); >>> } >>> >>> final private function emitTuple(array $tuple, $messageId = null, >>> $streamId = null, $directTask = null) { >>> $command = array( >>> 'command' => 'emit' >>> ); >>> >>> if ($messageId !== null) { >>> $command['id'] = $messageId; >>> } >>> >>> if ($streamId !== null) { >>> $command['stream'] = $streamId; >>> } >>> >>> if ($directTask !== null) { >>> $command['task'] = $directTask; >>> } >>> >>> $command['tuple'] = $tuple; >>> >>> return $this->sendCommand($command); >>> } >>> } >>> >>> class BoltProcessException extends Exception { >>> } >>> >>> ========================= >>> >>> >>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman < >>> [email protected]> wrote: >>> >>>> Hi, >>>> >>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234. >>>> >>>> I'll try to cover the important facts that led to this issue: >>>> >>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some >>>> existing business logic >>>> >>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition >>>> to the ShellBolt protocol) >>>> >>>> -I have some odd topologies where I try to do some legacy background >>>> processing. This processing takes a highly variable amount time in the >>>> Bolts, from milliseconds to minutes. But, eventually due to randomness the >>>> spout's "pending" pool fills up, causing the spout to block on nextTuple, >>>> which eventually causes a heartbeat timeout. (I believe my only fix is to >>>> increase the heartbeat timeout at the topology level. that's not the >>>> purpose of this email, though confirmation of this as my only workaround >>>> would be appreciated! I feel like this wasn't anticipated when the >>>> heartbeat patch was designed, as it was assumed the spout's nextTuple >>>> wouldn't block I guess?) >>>> >>>> -The purpose of this email is the fact that the topology "jams up" when >>>> the ShellSpout has a heartbeat timeout. I can see my PHP spout/bolt still >>>> running (I added logging to them), but Storm itself is doing nothing. >>>> >>>> -I added logging to ShellSpout and recompiled, because I saw the log >>>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP >>>> process was still running, so I was curious if _process.destroy(); failed. >>>> But, my logging didn't appear. I assumed I was compiling/deploying wrong. >>>> Eventually I commented out line 234: _collector.reportError(exception); >>>> and everything started working!!! >>>> >>>> Does this make *any* sense? Why would >>>> _collector.reportError(exception); block and never return (I waited quite a >>>> long time, 10's of minutes). When I comment out line 234, Storm >>>> immediately kills my bad tasks and respawns almost instantly. >>>> >>>> I feel fairly confident that this will be recreatable. My topology: >>>> -1 spout (ShellSpout) >>>> -1 bolt (ShellBolt) >>>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt >>>> + the pending queue is full >>>> >>>> Thanks for any feedback! >>>> >>>> will >>>> >>>> >>>> >>> >>> >>> >> >> >> -- >> Alex Sobrino Beltrán >> Registered Linux User #273657 >> >> http://v5tech.es >> > > > >
