hi Guys,

In Storm UI I see the following:
java.lang.RuntimeException: subprocess heartbeat timeout at
org.apache.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:311)
at java.util.concurrent.Executors$RunnableAdapter.call(

I'm using petrel library (Python) to work with Storm.
https://github.com/AirSage/Petrel/

I also use kafka-python (https://github.com/dpkp/kafka-python) to work with
kafka

Kafka 0.10
Storm 1.0.2
Python 3.5

java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

**My kafka spout is pretty simple**

import time
import config as cfgfrom petrel import stormfrom petrel.emitter import Spout
# enable loggers
log = cfg.config.loggers.kafka

class KafkaSpout(Spout):

    def __init__(self):
        self.stream = cfg.config.kafka
        # print(__file__)
        super().__init__(script=__file__)

    @classmethod
    def declareOutputFields(cls):
        return ['record']

    def nextTuple(self):
        try:
            record = next(self.stream)
            log.info("{}:{}:{}: key={} value={}".format(
                record.topic,
                record.partition,
                record.offset,
                record.key,
                record.value))
            storm.emit([record.value.decode()])
        finally:
            self.stream.commit()

def run():
    KafkaSpout().run()


So when messages are going into kafka - the whole process works properly.
However, if messages delay then record = next(self.stream) blocks execution
until new message comes in = and this crashes the worker
see attached log: worker.log
https://gist.github.com/dmitry-saritasa/188a70bef2dd402f2ad2648568ce2417

Any ideas how to overcome such problem?

Reply via email to