hi Guys, In Storm UI I see the following: java.lang.RuntimeException: subprocess heartbeat timeout at org.apache.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:311) at java.util.concurrent.Executors$RunnableAdapter.call(
I'm using petrel library (Python) to work with Storm. https://github.com/AirSage/Petrel/ I also use kafka-python (https://github.com/dpkp/kafka-python) to work with kafka Kafka 0.10 Storm 1.0.2 Python 3.5 java -version java version "1.8.0_121" Java(TM) SE Runtime Environment (build 1.8.0_121-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode) **My kafka spout is pretty simple** import time import config as cfgfrom petrel import stormfrom petrel.emitter import Spout # enable loggers log = cfg.config.loggers.kafka class KafkaSpout(Spout): def __init__(self): self.stream = cfg.config.kafka # print(__file__) super().__init__(script=__file__) @classmethod def declareOutputFields(cls): return ['record'] def nextTuple(self): try: record = next(self.stream) log.info("{}:{}:{}: key={} value={}".format( record.topic, record.partition, record.offset, record.key, record.value)) storm.emit([record.value.decode()]) finally: self.stream.commit() def run(): KafkaSpout().run() So when messages are going into kafka - the whole process works properly. However, if messages delay then record = next(self.stream) blocks execution until new message comes in = and this crashes the worker see attached log: worker.log https://gist.github.com/dmitry-saritasa/188a70bef2dd402f2ad2648568ce2417 Any ideas how to overcome such problem?
