I'm glad to hear I'm not the only one!

(no new news yet)

On Wed, Feb 11, 2015 at 3:09 AM, Alex Sobrino <[email protected]> wrote:

> Hi William,
>
> I'm having the same problem running a multilang topology (written in
> python). If you find a solution, please post it here, it will sure help us.
>
> To upgrade from 0.9.2-incubating we updated storm.py (
> https://raw.githubusercontent.com/apache/storm/master/storm-core/src/multilang/py/storm.py)
> and pom.xml.
>
> Downgrading to 0.9.2-incubating (downgrading storm.py and pom.xml), it
> works like hell.
>
> Best regards,
>
> On Tue, Feb 10, 2015 at 8:59 PM, William Oberman <[email protected]
> > wrote:
>
>> I'm not sure the best way to share a test case.  I'll copy and paste code
>> below....  If you run the below code (and find the worker that was running
>> it's log file), you should see in ~30 seconds:
>> ====
>> 2015-02-10T14:34:02.649-0500 b.s.s.ShellSpout [ERROR] Halting process:
>> ShellSpout died.
>> java.lang.RuntimeException: subprocess heartbeat timeout
>>         at
>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> [na:1.7.0_71]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>> 2015-02-10T14:34:02.649-0500 b.s.d.executor [ERROR]
>> java.lang.RuntimeException: subprocess heartbeat timeout
>>         at
>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:255)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> [na:1.7.0_71]
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> [na:1.7.0_71]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
>> =======
>>
>> But, the topology will run in a kind of weird zombie state forever.  More
>> specifically I see the multilang bolt process all tuples in the pending
>> queue, and then an infinite loop of nextTuple()/fail() from the multilang
>> spout.  But, as noted in my original email, if I comment out:
>>  _collector.reportError(exception);
>> in the Java ShellSpout then the worker will immediately die and respawn.
>>
>> If no one can help, the next step for me is rough, as I'll have to learn
>> how to actually develop and debug storm itself, which is usually at least
>> 10x harder than just using something :-)
>>
>> In any case, my test code:
>>
>> Topology = 1 process with two tasks (multilang spout and bolt), and small
>> pool of pending messages (yes, using the word count example in
>> storm-starter as a starting point....)
>> =============
>> public class SlowTopology {
>>   public static class SlowPhpBolt extends ShellBolt implements IRichBolt {
>>
>>     public SlowPhpBolt() {
>>       super("php", "slowBolt.php");
>>     }
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>       declarer.declare(new Fields());
>>     }
>>
>>     @Override
>>     public Map<String, Object> getComponentConfiguration() {
>>       return null;
>>     }
>>   }
>>
>>   public static class SlowPhpSpout extends ShellSpout implements
>> IRichSpout {
>>
>>       public SlowPhpSpout() {
>>           super("php", "slowSpout.php");
>>       }
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>         ofd.declare(new Fields("output"));
>>     }
>>
>>     @Override
>>     public Map<String, Object> getComponentConfiguration() {
>>         return null;
>>     }
>>   }
>>
>>   public static void main(String[] args) throws Exception {
>>
>>     TopologyBuilder builder = new TopologyBuilder();
>>
>>     builder.setSpout("spout", new SlowPhpSpout(),
>> 1).setNumTasks(1).setMaxSpoutPending(3);
>>     builder.setBolt("bolt", new SlowPhpBolt(),
>> 1).setNumTasks(1).shuffleGrouping("spout");
>>
>>     Config conf = new Config();
>>     conf.setDebug(true);
>>
>>     if (args != null && args.length > 0) {
>>       conf.setNumWorkers(1);
>>       StormSubmitter.submitTopologyWithProgressBar(args[0], conf,
>> builder.createTopology());
>>     }
>>     else {
>>       LocalCluster cluster = new LocalCluster();
>>       cluster.submitTopology("slow", conf, builder.createTopology());
>>       Thread.sleep(10000);
>>       cluster.shutdown();
>>     }
>>   }
>> }
>> ===========
>>
>> slowSpout.php
>> ==========
>> <?php
>> require_once "storm.php";
>> class slowSpout extends \ShellSpout {
>>   protected function nextTuple() {
>>     $value = rand(0,100);
>>     $id = rand(0, 100);
>>     $this->emit(array($value), $id);
>>     file_put_contents("/tmp/storm_slow.log", "nextTuple()->value[$value]
>> id[$id]\n", FILE_APPEND);
>>     sleep(1);
>>   }
>>   protected function ack($id) {
>>     file_put_contents("/tmp/storm_slow.log", "ack($id)\n", FILE_APPEND);
>>   }
>>
>>   protected function fail($id) {
>>     file_put_contents("/tmp/storm_slow.log", "fail($id)\n", FILE_APPEND);
>>   }
>> }
>>
>> (new slowSpout())->run();
>> ===========
>>
>> slowBolt.php
>> ============
>> <?php
>> require_once "storm.php";
>> class slowBolt extends \BasicBolt {
>>   protected function process(\Tuple $t) {
>>     $sleep = rand(1, 180);
>>     file_put_contents("/tmp/storm_slow.log", "process(".print_r($t,
>> true)."), sleeping for sleep[$sleep]\n", FILE_APPEND);
>>     sleep($sleep);
>>   }
>> }
>> (new slowBolt())->run();
>> ============
>>
>> storm.php  (from  https://github.com/lazyshot/storm-php, and I think I
>> added more error checking on reads/writes to standard in/out, added sync()
>> to the ShellSpout to make new classes easier to write, and the new
>> heartbeat protocol)
>> =========
>> <?php
>> interface iShellBolt {
>> }
>>
>> interface iShellSpout {
>> }
>>
>> class Tuple {
>>     public $id, $component, $stream, $task, $values;
>>
>>     public function __construct($id, $component, $stream, $task, $values)
>> {
>>         $this->id = $id;
>>         $this->component = $component;
>>         $this->stream = $stream;
>>         $this->task = $task;
>>         $this->values = $values;
>>     }
>> }
>>
>> abstract class ShellComponent {
>>     protected $pid;
>>     protected $stormConf;
>>     protected $topologyContext;
>>
>>     protected $stormInc = null;
>>
>>     public function __construct() {
>>         $this->pid = getmypid();
>>         $this->sendCommand(array("pid" => $this->pid));
>>
>>         $handshake = $this->parseMessage($this->waitForMessage());
>>
>>         $this->stormConf = $handshake['conf'];
>>         $this->topologyContext = $handshake['context'];
>>         $pidDir = $handshake['pidDir'];
>>
>>         @fclose(@fopen($pidDir . "/" . $this->pid, "w"));
>>     }
>>
>>     protected function readLine() {
>>         $raw = fgets(STDIN);
>>
>>         if ($raw === false) {
>>             throw new Exception("STDIN is broken");
>>         }
>>
>>         $line = trim($raw);
>>
>>         return $line;
>>     }
>>
>>     protected function waitForMessage() {
>>         $message = '';
>>         while (true) {
>>             $line = trim($this->readLine());
>>
>>             if (strlen($line) == 0) {
>>                 continue;
>>             } else if ($line == 'end') {
>>                 break;
>>             } else if ($line == 'sync') {
>>                 $message = '';
>>                 continue;
>>             }
>>
>>             $message .= $line . "\n";
>>         }
>>
>>         return trim($message);
>>     }
>>
>>     protected function sendCommand(array $command) {
>>         $this->sendMessage(json_encode($command));
>>     }
>>
>>     protected function sendLog($message) {
>>         return $this->sendCommand(array(
>>             'command' => 'log',
>>             'msg' => $message
>>         ));
>>     }
>>
>>     protected function parseMessage($message) {
>>         $msg = json_decode($message, true);
>>
>>         if ($msg) {
>>             return $msg;
>>         } else {
>>             return $message;
>>         }
>>     }
>>
>>     protected function sendMessage($message) {
>>         $message = "$message\nend\n";
>>         $bytesWritten = fwrite(STDOUT, $message);
>>         fflush(STDOUT);
>>         if ($bytesWritten === false) {
>>             throw new Exception("STDOUT is broken");
>>         }
>>         if ($bytesWritten != strlen($message)) {
>>             throw new Exception("Unable to write all bytes to STDOUT
>> (message=$message, bytesWritten=$bytesWritten)");
>>         }
>>     }
>>
>>     final protected function sync() {
>>         $command = array(
>>             'command' => 'sync',
>>         );
>>
>>         $this->sendCommand($command);
>>     }
>>
>> }
>>
>> abstract class ShellBolt extends ShellComponent implements iShellBolt {
>>
>>     public $anchor_tuple = null;
>>
>>     public function __construct() {
>>         parent::__construct();
>>
>>         $this->init($this->stormConf, $this->topologyContext);
>>     }
>>
>>     public function run() {
>>         try {
>>             while (true) {
>>                 $command = $this->parseMessage($this->waitForMessage());
>>
>>                 if (is_array($command)) {
>>                     if (isset($command['tuple'])) {
>>                         $tupleMap = array_merge(array(
>>                                 'id' => null,
>>                                 'comp' => null,
>>                                 'stream' => null,
>>                                 'task' => null,
>>                                 'tuple' => null
>>                             ),
>>
>>                             $command);
>>
>>                         if($tupleMap['task'] == -1 && $tupleMap['stream']
>> == "__heartbeat") {
>>                             $this->sync();
>>                         } else {
>>                             $tuple = new Tuple($tupleMap['id'],
>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>> $tupleMap['tuple']);
>>                             $this->process($tuple);
>>                         }
>>                     }
>>                 }
>>             }
>>         } catch (Exception $e) {
>>             $this->sendLog((string)$e);
>>         }
>>     }
>>
>>     abstract protected function process(Tuple $tuple);
>>
>>     protected function init($conf, $topology) {
>>         return;
>>     }
>>
>>     protected function emitTuple(array $tuple, $stream = null, $anchors =
>> array(), $directTask = null) {
>>         if ($this->anchor_tuple !== null) {
>>             $anchors = array($this->anchor_tuple);
>>         }
>>
>>         $command = array(
>>             'command' => 'emit'
>>         );
>>
>>         if ($stream !== null) {
>>             $command['stream'] = $stream;
>>         }
>>
>>         $command['anchors'] = array_map(function ($a) {
>>             return $a->id;
>>         }, $anchors);
>>
>>         if ($directTask !== null) {
>>             $command['task'] = $directTask;
>>         }
>>
>>         $command['tuple'] = $tuple;
>>
>>         $this->sendCommand($command);
>>     }
>>
>>     protected function emit($tuple, $stream = null, $anchors = array()) {
>>         $this->emitTuple($tuple, $stream, $anchors);
>>     }
>>
>>     protected function emitDirect($directTask, $tuple, $stream = null,
>> $anchors = array()) {
>>         $this->emitTuple($tuple, $stream, $anchors, $directTask);
>>     }
>>
>>     protected function ack(Tuple $tuple) {
>>         $command = array(
>>             'command' => 'ack',
>>             'id' => $tuple->id
>>         );
>>
>>         $this->sendCommand($command);
>>     }
>>
>>     protected function fail(Tuple $tuple) {
>>         $command = array(
>>             'command' => 'fail',
>>             'id' => $tuple->id
>>         );
>>
>>         $this->sendCommand($command);
>>     }
>> }
>>
>> abstract class BasicBolt extends ShellBolt {
>>     public function run() {
>>         try {
>>             while (true) {
>>                 $command = $this->parseMessage($this->waitForMessage());
>>
>>                 if (is_array($command)) {
>>                     if (isset($command['tuple'])) {
>>                         $tupleMap = array_merge(array(
>>                                 'id' => null,
>>                                 'comp' => null,
>>                                 'stream' => null,
>>                                 'task' => null,
>>                                 'tuple' => null
>>                             ),
>>
>>                             $command);
>>
>>                         if($tupleMap['task'] == -1 && $tupleMap['stream']
>> == "__heartbeat") {
>>                             $this->sync();
>>                         } else {
>>                             $tuple = new Tuple($tupleMap['id'],
>> $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'],
>> $tupleMap['tuple']);
>>
>>                             $this->anchor_tuple = $tuple;
>>
>>                             try {
>>                                 $processed = $this->process($tuple);
>>
>>                                 $this->ack($tuple);
>>                             } catch (BoltProcessException $e) {
>>                                 $this->fail($tuple);
>>                             }
>>                         }
>>                     }
>>                 }
>>             }
>>         } catch (Exception $e) {
>>             $this->sendLog((string)$e);
>>         }
>>
>>     }
>> }
>>
>> abstract class ShellSpout extends ShellComponent implements iShellSpout {
>>     protected $tuples = array();
>>
>>     public function __construct() {
>>         parent::__construct();
>>
>>         $this->init($this->stormConf, $this->topologyContext);
>>     }
>>
>>
>>     abstract protected function nextTuple();
>>
>>     abstract protected function ack($tuple_id);
>>
>>     abstract protected function fail($tuple_id);
>>
>>     public function run() {
>>         try {
>>             while (true) {
>>                 $command = $this->parseMessage($this->waitForMessage());
>>
>>                 if (is_array($command)) {
>>                     if (isset($command['command'])) {
>>                         if ($command['command'] == 'ack') {
>>                             $this->ack($command['id']);
>>                             $this->sync();
>>                         } else if ($command['command'] == 'fail') {
>>                             $this->fail($command['id']);
>>                             $this->sync();
>>                         } else if ($command['command'] == 'next') {
>>                             $this->nextTuple();
>>                             $this->sync();
>>                         }
>>                     }
>>                 }
>>             }
>>         } catch (Exception $e) {
>>             $this->sendLog((string)$e);
>>             $this->sync();
>>         }
>>     }
>>
>>     protected function init($stormConf, $topologyContext) {
>>         return;
>>     }
>>
>>     final protected function emit(array $tuple, $messageId = null,
>> $streamId = null) {
>>         return $this->emitTuple($tuple, $messageId, $streamId, null);
>>     }
>>
>>     final protected function emitDirect($directTask, array $tuple,
>> $messageId = null, $streamId = null) {
>>         return $this->emitTuple($tuple, $messageId, $streamId,
>> $directTask);
>>     }
>>
>>     final private function emitTuple(array $tuple, $messageId = null,
>> $streamId = null, $directTask = null) {
>>         $command = array(
>>             'command' => 'emit'
>>         );
>>
>>         if ($messageId !== null) {
>>             $command['id'] = $messageId;
>>         }
>>
>>         if ($streamId !== null) {
>>             $command['stream'] = $streamId;
>>         }
>>
>>         if ($directTask !== null) {
>>             $command['task'] = $directTask;
>>         }
>>
>>         $command['tuple'] = $tuple;
>>
>>         return $this->sendCommand($command);
>>     }
>> }
>>
>> class BoltProcessException extends Exception {
>> }
>>
>> =========================
>>
>>
>> On Fri, Feb 6, 2015 at 9:48 AM, William Oberman <[email protected]
>> > wrote:
>>
>>> Hi,
>>>
>>> For reference, I'm talking about 0.9.3 ShellSpout, line 234.
>>>
>>> I'll try to cover the important facts that led to this issue:
>>>
>>> -I was on 0.9.2 using multilang to bridge to PHP to get to some existing
>>> business logic
>>>
>>> -I'm testing the 0.9.3 upgrade (yes, I see the new heartbeat addition to
>>> the ShellBolt protocol)
>>>
>>> -I have some odd topologies where I try to do some legacy background
>>> processing.  This processing takes a highly variable amount time in the
>>> Bolts, from milliseconds to minutes.  But, eventually due to randomness the
>>> spout's "pending" pool fills up, causing the spout to block on nextTuple,
>>> which eventually causes a heartbeat timeout. (I believe my only fix is to
>>> increase the heartbeat timeout at the topology level. that's not the
>>> purpose of this email, though confirmation of this as my only workaround
>>> would be appreciated!  I feel like this wasn't anticipated when the
>>> heartbeat patch was designed, as it was assumed the spout's nextTuple
>>> wouldn't block I guess?)
>>>
>>> -The purpose of this email is the fact that the topology "jams up" when
>>> the ShellSpout has a heartbeat timeout.  I can see my PHP spout/bolt still
>>> running (I added logging to them), but Storm itself is doing nothing.
>>>
>>> -I added logging to ShellSpout and recompiled, because I saw the log
>>> message on like 233 (Halting process: ShellSpout died) but as noted the PHP
>>> process was still running, so I was curious if _process.destroy(); failed.
>>> But, my logging didn't appear.  I assumed I was compiling/deploying wrong.
>>> Eventually I commented out line 234: _collector.reportError(exception);
>>>  and everything started working!!!
>>>
>>> Does this make *any* sense?  Why would
>>> _collector.reportError(exception); block and never return (I waited quite a
>>> long time, 10's of minutes).  When I comment out line 234, Storm
>>> immediately kills my bad tasks and respawns almost instantly.
>>>
>>> I feel fairly confident that this will be recreatable.  My topology:
>>> -1 spout (ShellSpout)
>>> -1 bolt (ShellBolt)
>>> -The ShellSpout has a heartbeat timeout due to slow tasks in ShellBolt +
>>> the pending queue is full
>>>
>>> Thanks for any feedback!
>>>
>>> will
>>>
>>>
>>>
>>
>>
>>
>
>
> --
> Alex Sobrino Beltrán
> Registered Linux User #273657
>
> http://v5tech.es
>

Reply via email to