I'm glad to hear I'm not the only one! (no new news yet)
On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <[email protected]> wrote: > Hi William, > > I'm having the same problem running a multilang topology (written in > python). If you find a solution, please post it here, it will sure help us. > > To upgrade from 0.9.2-incubating we updated storm.py ( > https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py) > and pom.xml. > > Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it > works like hell. > > Best regards, > > On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <[email protected] > > wrote: > >> I'm not sure the best way to share a test case. I'll copy and paste code >> below.... If you run the below code (and find the worker that was running >> it's log file), you should see in ~30 seconds: >> ==== >> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process: >> ShellSpout died. >> java.lang.RuntimeException: subprocess heartbeat timeout >> at >> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) >> [storm-core-0.9.3.jar:0.9.3] >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> [na:1.7.0_71] >> at >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) >> [na:1.7.0_71] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) >> [na:1.7.0_71] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> [na:1.7.0_71] >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> [na:1.7.0_71] >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> [na:1.7.0_71] >> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] >> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR] >> java.lang.RuntimeException: subprocess heartbeat timeout >> at >> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255) >> [storm-core-0.9.3.jar:0.9.3] >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> [na:1.7.0_71] >> at >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) >> [na:1.7.0_71] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) >> [na:1.7.0_71] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> [na:1.7.0_71] >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> [na:1.7.0_71] >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> [na:1.7.0_71] >> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] >> ======= >> >> But, the topology will run in a kind of weird zombie state forever. More >> specifically I see the multilang bolt process all tuples in the pending >> queue, and then an infinite loop of nextTuple()/fail() from the multilang >> spout. But, as noted in my original email, if I comment out: >> _collector.reportError(exception); >> in the Java ShellSpout then the worker will immediately die and respawn. >> >> If no one can help, the next step for me is rough, as I'll have to learn >> how to actually develop and debug storm itself, which is usually at least >> 10x harder than just using something :-) >> >> In any case, my test code: >> >> Topology = 1 process with two tasks (multilang spout and bolt), and small >> pool of pending messages (yes, using the word count example in >> storm-starter as a starting point....) >> ============= >> public class SlowTopology { >> public static class SlowPhpBolt extends ShellBolt implements IRichBolt { >> >> public SlowPhpBolt() { >> super("php", "slowBolt.php"); >> } >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer declarer) { >> declarer.declare(new Fields()); >> } >> >> @Override >> public Map<String, Object> getComponentConfiguration() { >> return null; >> } >> } >> >> public static class SlowPhpSpout extends ShellSpout implements >> IRichSpout { >> >> public SlowPhpSpout() { >> super("php", "slowSpout.php"); >> } >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer ofd) { >> ofd.declare(new Fields("output")); >> } >> >> @Override >> public Map<String, Object> getComponentConfiguration() { >> return null; >> } >> } >> >> public static void main(String[] args) throws Exception { >> >> TopologyBuilder builder = new TopologyBuilder(); >> >> builder.setSpout("spout", new SlowPhpSpout(), >> 1).setNumTasks(1).setMaxSpoutPending(3); >> builder.setBolt("bolt", new SlowPhpBolt(), >> 1).setNumTasks(1).shuffleGrouping("spout"); >> >> Config conf = new Config(); >> conf.setDebug(true); >> >> if (args != null && args.length > 0) { >> conf.setNumWorkers(1); >> StormSubmitter.submitTopologyWithProgressBar(args[0], conf, >> builder.createTopology()); >> } >> else { >> LocalCluster cluster = new LocalCluster(); >> cluster.submitTopology("slow", conf, builder.createTopology()); >> Thread.sleep(10000); >> cluster.shutdown(); >> } >> } >> } >> =========== >> >> slowSpout.php >> ========== >> <?php >> require_once "storm.php"; >> class slowSpout extends \ShellSpout { >> protected function nextTuple() { >> $value = rand(0,100); >> $id = rand(0, 100); >> $this->emit(array($value), $id); >> file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value] >> id[$id]\n", FILE_APPEND); >> sleep(1); >> } >> protected function ack($id) { >> file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND); >> } >> >> protected function fail($id) { >> file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND); >> } >> } >> >> (new slowSpout())->run(); >> =========== >> >> slowBolt.php >> ============ >> <?php >> require_once "storm.php"; >> class slowBolt extends \BasicBolt { >> protected function process(\Tuple $t) { >> $sleep = rand(1, 180); >> file_put_contents("/tmp/storm_slow.log", "process(".print_r($t, >> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND); >> sleep($sleep); >> } >> } >> (new slowBolt())->run(); >> ============ >> >> storm.php (from https://github.com/lazyshot/storm-php, and I think I >> added more error checking on reads/writes to standard in/out, added sync() >> to the ShellSpout to make new classes easier to write, and the new >> heartbeat protocol) >> ========= >> <?php >> interface iShellBolt { >> } >> >> interface iShellSpout { >> } >> >> class Tuple { >> public $id, $component, $stream, $task, $values; >> >> public function __construct($id, $component, $stream, $task, $values) >> { >> $this->id = $id; >> $this->component = $component; >> $this->stream = $stream; >> $this->task = $task; >> $this->values = $values; >> } >> } >> >> abstract class ShellComponent { >> protected $pid; >> protected $stormConf; >> protected $topologyContext; >> >> protected $stormInc = null; >> >> public function __construct() { >> $this->pid = getmypid(); >> $this->sendCommand(array("pid" => $this->pid)); >> >> $handshake = $this->parseMessage($this->waitForMessage()); >> >> $this->stormConf = $handshake['conf']; >> $this->topologyContext = $handshake['context']; >> $pidDir = $handshake['pidDir']; >> >> @fclose(@fopen($pidDir . "/" . $this->pid, "w")); >> } >> >> protected function readLine() { >> $raw = fgets(STDIN); >> >> if ($raw === false) { >> throw new Exception("STDIN is broken"); >> } >> >> $line = trim($raw); >> >> return $line; >> } >> >> protected function waitForMessage() { >> $message = ''; >> while (true) { >> $line = trim($this->readLine()); >> >> if (strlen($line) == 0) { >> continue; >> } else if ($line == 'end') { >> break; >> } else if ($line == 'sync') { >> $message = ''; >> continue; >> } >> >> $message .= $line . "\n"; >> } >> >> return trim($message); >> } >> >> protected function sendCommand(array $command) { >> $this->sendMessage(json_encode($command)); >> } >> >> protected function sendLog($message) { >> return $this->sendCommand(array( >> 'command' => 'log', >> 'msg' => $message >> )); >> } >> >> protected function parseMessage($message) { >> $msg = json_decode($message, true); >> >> if ($msg) { >> return $msg; >> } else { >> return $message; >> } >> } >> >> protected function sendMessage($message) { >> $message = "$message\nend\n"; >> $bytesWritten = fwrite(STDOUT, $message); >> fflush(STDOUT); >> if ($bytesWritten === false) { >> throw new Exception("STDOUT is broken"); >> } >> if ($bytesWritten != strlen($message)) { >> throw new Exception("Unable to write all bytes to STDOUT >> (message=$message, bytesWritten=$bytesWritten)"); >> } >> } >> >> final protected function sync() { >> $command = array( >> 'command' => 'sync', >> ); >> >> $this->sendCommand($command); >> } >> >> } >> >> abstract class ShellBolt extends ShellComponent implements iShellBolt { >> >> public $anchor_tuple = null; >> >> public function __construct() { >> parent::__construct(); >> >> $this->init($this->stormConf, $this->topologyContext); >> } >> >> public function run() { >> try { >> while (true) { >> $command = $this->parseMessage($this->waitForMessage()); >> >> if (is_array($command)) { >> if (isset($command['tuple'])) { >> $tupleMap = array_merge(array( >> 'id' => null, >> 'comp' => null, >> 'stream' => null, >> 'task' => null, >> 'tuple' => null >> ), >> >> $command); >> >> if($tupleMap['task'] == -1 && $tupleMap['stream'] >> == "__heartbeat") { >> $this->sync(); >> } else { >> $tuple = new Tuple($tupleMap['id'], >> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], >> $tupleMap['tuple']); >> $this->process($tuple); >> } >> } >> } >> } >> } catch (Exception $e) { >> $this->sendLog((string)$e); >> } >> } >> >> abstract protected function process(Tuple $tuple); >> >> protected function init($conf, $topology) { >> return; >> } >> >> protected function emitTuple(array $tuple, $stream = null, $anchors = >> array(), $directTask = null) { >> if ($this->anchor_tuple !== null) { >> $anchors = array($this->anchor_tuple); >> } >> >> $command = array( >> 'command' => 'emit' >> ); >> >> if ($stream !== null) { >> $command['stream'] = $stream; >> } >> >> $command['anchors'] = array_map(function ($a) { >> return $a->id; >> }, $anchors); >> >> if ($directTask !== null) { >> $command['task'] = $directTask; >> } >> >> $command['tuple'] = $tuple; >> >> $this->sendCommand($command); >> } >> >> protected function emit($tuple, $stream = null, $anchors = array()) { >> $this->emitTuple($tuple, $stream, $anchors); >> } >> >> protected function emitDirect($directTask, $tuple, $stream = null, >> $anchors = array()) { >> $this->emitTuple($tuple, $stream, $anchors, $directTask); >> } >> >> protected function ack(Tuple $tuple) { >> $command = array( >> 'command' => 'ack', >> 'id' => $tuple->id >> ); >> >> $this->sendCommand($command); >> } >> >> protected function fail(Tuple $tuple) { >> $command = array( >> 'command' => 'fail', >> 'id' => $tuple->id >> ); >> >> $this->sendCommand($command); >> } >> } >> >> abstract class BasicBolt extends ShellBolt { >> public function run() { >> try { >> while (true) { >> $command = $this->parseMessage($this->waitForMessage()); >> >> if (is_array($command)) { >> if (isset($command['tuple'])) { >> $tupleMap = array_merge(array( >> 'id' => null, >> 'comp' => null, >> 'stream' => null, >> 'task' => null, >> 'tuple' => null >> ), >> >> $command); >> >> if($tupleMap['task'] == -1 && $tupleMap['stream'] >> == "__heartbeat") { >> $this->sync(); >> } else { >> $tuple = new Tuple($tupleMap['id'], >> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], >> $tupleMap['tuple']); >> >> $this->anchor_tuple = $tuple; >> >> try { >> $processed = $this->process($tuple); >> >> $this->ack($tuple); >> } catch (BoltProcessException $e) { >> $this->fail($tuple); >> } >> } >> } >> } >> } >> } catch (Exception $e) { >> $this->sendLog((string)$e); >> } >> >> } >> } >> >> abstract class ShellSpout extends ShellComponent implements iShellSpout { >> protected $tuples = array(); >> >> public function __construct() { >> parent::__construct(); >> >> $this->init($this->stormConf, $this->topologyContext); >> } >> >> >> abstract protected function nextTuple(); >> >> abstract protected function ack($tuple_id); >> >> abstract protected function fail($tuple_id); >> >> public function run() { >> try { >> while (true) { >> $command = $this->parseMessage($this->waitForMessage()); >> >> if (is_array($command)) { >> if (isset($command['command'])) { >> if ($command['command'] == 'ack') { >> $this->ack($command['id']); >> $this->sync(); >> } else if ($command['command'] == 'fail') { >> $this->fail($command['id']); >> $this->sync(); >> } else if ($command['command'] == 'next') { >> $this->nextTuple(); >> $this->sync(); >> } >> } >> } >> } >> } catch (Exception $e) { >> $this->sendLog((string)$e); >> $this->sync(); >> } >> } >> >> protected function init($stormConf, $topologyContext) { >> return; >> } >> >> final protected function emit(array $tuple, $messageId = null, >> $streamId = null) { >> return $this->emitTuple($tuple, $messageId, $streamId, null); >> } >> >> final protected function emitDirect($directTask, array $tuple, >> $messageId = null, $streamId = null) { >> return $this->emitTuple($tuple, $messageId, $streamId, >> $directTask); >> } >> >> final private function emitTuple(array $tuple, $messageId = null, >> $streamId = null, $directTask = null) { >> $command = array( >> 'command' => 'emit' >> ); >> >> if ($messageId !== null) { >> $command['id'] = $messageId; >> } >> >> if ($streamId !== null) { >> $command['stream'] = $streamId; >> } >> >> if ($directTask !== null) { >> $command['task'] = $directTask; >> } >> >> $command['tuple'] = $tuple; >> >> return $this->sendCommand($command); >> } >> } >> >> class BoltProcessException extends Exception { >> } >> >> ========================= >> >> >> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <[email protected] >> > wrote: >> >>> Hi, >>> >>> For reference, I'm talking about 0.9.3 ShellSpout, line 234. >>> >>> I'll try to cover the important facts that led to this issue: >>> >>> -I was on 0.9.2 using multilang to bridge to PHP to get to some existing >>> business logic >>> >>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition to >>> the ShellBolt protocol) >>> >>> -I have some odd topologies where I try to do some legacy background >>> processing. This processing takes a highly variable amount time in the >>> Bolts, from milliseconds to minutes. But, eventually due to randomness the >>> spout's "pending" pool fills up, causing the spout to block on nextTuple, >>> which eventually causes a heartbeat timeout. (I believe my only fix is to >>> increase the heartbeat timeout at the topology level. that's not the >>> purpose of this email, though confirmation of this as my only workaround >>> would be appreciated! I feel like this wasn't anticipated when the >>> heartbeat patch was designed, as it was assumed the spout's nextTuple >>> wouldn't block I guess?) >>> >>> -The purpose of this email is the fact that the topology "jams up" when >>> the ShellSpout has a heartbeat timeout. I can see my PHP spout/bolt still >>> running (I added logging to them), but Storm itself is doing nothing. >>> >>> -I added logging to ShellSpout and recompiled, because I saw the log >>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP >>> process was still running, so I was curious if _process.destroy(); failed. >>> But, my logging didn't appear. I assumed I was compiling/deploying wrong. >>> Eventually I commented out line 234: _collector.reportError(exception); >>> and everything started working!!! >>> >>> Does this make *any* sense? Why would >>> _collector.reportError(exception); block and never return (I waited quite a >>> long time, 10's of minutes). When I comment out line 234, Storm >>> immediately kills my bad tasks and respawns almost instantly. >>> >>> I feel fairly confident that this will be recreatable. My topology: >>> -1 spout (ShellSpout) >>> -1 bolt (ShellBolt) >>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt + >>> the pending queue is full >>> >>> Thanks for any feedback! >>> >>> will >>> >>> >>> >> >> >> > > > -- > Alex Sobrino Beltrán > Registered Linux User #273657 > > http://v5tech.es >
