FYI - submit a pull request regarding STORM-1928: https://github.com/apache/storm/pull/1526
It's against master branch, but since ShellSpout is not heavily modified across minor versions so you can easily apply patch to Storm 0.10.x and build your own storm-core package if you want to evaluate. Hope this helps, Thanks, Jungtaek Lim (HeartSaVioR) 2016년 6월 24일 (금) 오전 12:04, Jungtaek Lim <[email protected]>님이 작성: > FYI - Filed issue from myself here: > https://issues.apache.org/jira/browse/STORM-1928 > > 2016년 6월 23일 (목) 오후 11:33, Jungtaek Lim <[email protected]>님이 작성: > >> Do get_simple_consumer() blocks for receiving messages? If it is, can we >> set timeout on this? >> >> Btw, I found edge-case from ShellSpout (not sure if it represents your >> case) : >> When nextTuple is not calling at any chances (max spout pending, >> backpressure introduced at 1.0.0, and so on) multi-lang spout don't receive >> any message from ShellSpout thus no messages from multi-lang spout side, so >> no heartbeat is being marked. >> >> I'll file an issue and see if it's easy to fix. >> >> Jungtaek Lim (HeartSaVioR) >> >> 2016년 6월 23일 (목) 오후 10:59, cogumelosmaravilha <[email protected]>님이 >> 작성: >> >>> Python Petrel Spout sample; >>> >>> from pykafka import KafkaClient >>> from petrel import storm >>> from petrel.emitter import Spout >>> >>> class MPDataSpoutInd(Spout): >>> """Topology Spout.""" >>> def __init__(self): >>> self.kafclient = KafkaClient(hosts="x.x.x.x:9092") >>> super(MPDataSpoutInd, self).__init__(script=__file__) >>> >>> @classmethod >>> def declareOutputFields(cls): >>> return ['x_id', 'y_id', 'z_id', 'date_stamp', >>> 'time_stamp', 'a', 'b', 'c', 'date_start', >>> 'time_start', 'users'] >>> >>> def initialize(self, conf, context): >>> topic = self.kafclient.topics['individual_data'] >>> consumer = topic.get_simple_consumer() >>> consumer.reset_offsets() >>> >>> def nextTuple(self): >>> topic = self.kafclient.topics['individual_data'] >>> consumer = topic.get_simple_consumer() >>> for message in consumer: >>> if message is not None: >>> x_id = message.value[0:24] >>> y_id = message.value[25:49] >>> z_id = message.value[50:74] >>> date_stamp = message.value[75:85] >>> time_stamp = message.value[86:98] >>> a = message.value[99:102] >>> b = message.value[103:106] >>> c = message.value[107:110] >>> start_date = message.value[111:121] >>> start_time = message.value[122:134] >>> users = message.value[135:138] >>> datetime_str = date_stamp + ' ' + time_stamp >>> datetime_start = start_date + ' ' + start_time >>> >>> if datetime_str > datetime_start: >>> storm.emit([x_id, y_id, z_id, date_stamp, >>> time_stamp, str(a), str(b), str(c), >>> start_date, >>> start_time, str(users)) >>> >>> def run(): >>> """Petrel must be.""" >>> MPDataSpoutInd().run() >>> >>> >>> On 23-06-2016 13:34, Jungtaek Lim wrote: >>> >>> Could you share implementation of spout? >>> In multi-lang user level functions shouldn't block, so heartbeat timeout >>> will occur if your spout requests to Kafka and wait for response infinitely. >>> >>> - Jungtaek Lim (HeartSaVioR) >>> >>> 2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <[email protected]> >>> 님이 작성: >>> >>>> Yes, but if you submit to Storm, it's ok, with or without an empty >>>> Kafka. >>>> >>>> >>>> On 23-06-2016 10:40, ram kumar wrote: >>>> >>>> Yes, The same topology, when I run in single node cluster, it works. >>>> And when there is no data to consume from kafka, it shows heartbeat timeout >>>> error. >>>> >>>> Here, I am testing in multi node cluster and kafka server is in >>>> different node >>>> >>>> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha < >>>> [email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> It happens to me to, but only when my kafka is empy. >>>>> I'm using Petrel because the generated jar file is really small like >>>>> 300k Petrel vs 16M Streamparse. >>>>> >>>>> >>>>> On 23-06-2016 08:05, ram kumar wrote: >>>>> >>>>> Hi, >>>>> >>>>> >>>>> *Version: * >>>>> Storm : 0.10.0 >>>>> Streamparse : 2.1.4 >>>>> >>>>> >>>>> I am running a storm topology with a python streamparse "sparse run". >>>>> >>>>> This topology stops executing in the middle and throw an exception >>>>> >>>>> >>>>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process: >>>>>> ShellSpout died. >>>>>> java.lang.RuntimeException: *subprocess heartbeat timeout* >>>>>> at >>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261) >>>>>> [storm-core-0.10.0.jar:0.10.0] >>>>>> at >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>> [?:1.8.0_40] >>>>>> at >>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>>>> [?:1.8.0_40] >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>>>> [?:1.8.0_40] >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>>>> [?:1.8.0_40] >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>> [?:1.8.0_40] >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>> [?:1.8.0_40] >>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40] >>>>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor - >>>>>> java.lang.RuntimeException: subprocess heartbeat timeout >>>>>> at >>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261) >>>>>> [storm-core-0.10.0.jar:0.10.0] >>>>>> at >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>>> [?:1.8.0_40] >>>>>> at >>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>>>> [?:1.8.0_40] >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>>>> [?:1.8.0_40] >>>>>> at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>>>> [?:1.8.0_40] >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>> [?:1.8.0_40] >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>> [?:1.8.0_40] >>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40] >>>>>> >>>>> >>>>> >>>>> This occurs randomly >>>>> I can't able to trace back to the problem >>>>> >>>>> Maybe if spout takes too long to process, then streamparse can't >>>>> acknowledge the heartbeat in time >>>>> >>>>> changed "supervisor.worker.timeout.secs" from 30 to 600 >>>>> >>>>> Still the topology breaks. Is there any other options here? >>>>> >>>>> >>>>> Thanks, >>>>> Ram. >>>>> >>>>> >>>>> >>>> >>>> >>>
