Do get_simple_consumer() blocks for receiving messages? If it is, can we set timeout on this?
Btw, I found edge-case from ShellSpout (not sure if it represents your case) : When nextTuple is not calling at any chances (max spout pending, backpressure introduced at 1.0.0, and so on) multi-lang spout don't receive any message from ShellSpout thus no messages from multi-lang spout side, so no heartbeat is being marked. I'll file an issue and see if it's easy to fix. Jungtaek Lim (HeartSaVioR) 2016년 6월 23일 (목) 오후 10:59, cogumelosmaravilha <[email protected]>님이 작성: > Python Petrel Spout sample; > > from pykafka import KafkaClient > from petrel import storm > from petrel.emitter import Spout > > class MPDataSpoutInd(Spout): > """Topology Spout.""" > def __init__(self): > self.kafclient = KafkaClient(hosts="x.x.x.x:9092") > super(MPDataSpoutInd, self).__init__(script=__file__) > > @classmethod > def declareOutputFields(cls): > return ['x_id', 'y_id', 'z_id', 'date_stamp', > 'time_stamp', 'a', 'b', 'c', 'date_start', > 'time_start', 'users'] > > def initialize(self, conf, context): > topic = self.kafclient.topics['individual_data'] > consumer = topic.get_simple_consumer() > consumer.reset_offsets() > > def nextTuple(self): > topic = self.kafclient.topics['individual_data'] > consumer = topic.get_simple_consumer() > for message in consumer: > if message is not None: > x_id = message.value[0:24] > y_id = message.value[25:49] > z_id = message.value[50:74] > date_stamp = message.value[75:85] > time_stamp = message.value[86:98] > a = message.value[99:102] > b = message.value[103:106] > c = message.value[107:110] > start_date = message.value[111:121] > start_time = message.value[122:134] > users = message.value[135:138] > datetime_str = date_stamp + ' ' + time_stamp > datetime_start = start_date + ' ' + start_time > > if datetime_str > datetime_start: > storm.emit([x_id, y_id, z_id, date_stamp, > time_stamp, str(a), str(b), str(c), > start_date, > start_time, str(users)) > > def run(): > """Petrel must be.""" > MPDataSpoutInd().run() > > > On 23-06-2016 13:34, Jungtaek Lim wrote: > > Could you share implementation of spout? > In multi-lang user level functions shouldn't block, so heartbeat timeout > will occur if your spout requests to Kafka and wait for response infinitely. > > - Jungtaek Lim (HeartSaVioR) > > 2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <[email protected]> > 님이 작성: > >> Yes, but if you submit to Storm, it's ok, with or without an empty Kafka. >> >> >> On 23-06-2016 10:40, ram kumar wrote: >> >> Yes, The same topology, when I run in single node cluster, it works. And >> when there is no data to consume from kafka, it shows heartbeat timeout >> error. >> >> Here, I am testing in multi node cluster and kafka server is in different >> node >> >> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha < >> [email protected]> wrote: >> >>> Hi, >>> >>> It happens to me to, but only when my kafka is empy. >>> I'm using Petrel because the generated jar file is really small like >>> 300k Petrel vs 16M Streamparse. >>> >>> >>> On 23-06-2016 08:05, ram kumar wrote: >>> >>> Hi, >>> >>> >>> *Version: * >>> Storm : 0.10.0 >>> Streamparse : 2.1.4 >>> >>> >>> I am running a storm topology with a python streamparse "sparse run". >>> >>> This topology stops executing in the middle and throw an exception >>> >>> >>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process: >>>> ShellSpout died. >>>> java.lang.RuntimeException: *subprocess heartbeat timeout* >>>> at >>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261) >>>> [storm-core-0.10.0.jar:0.10.0] >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> [?:1.8.0_40] >>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>> [?:1.8.0_40] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>> [?:1.8.0_40] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>> [?:1.8.0_40] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> [?:1.8.0_40] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> [?:1.8.0_40] >>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40] >>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor - >>>> java.lang.RuntimeException: subprocess heartbeat timeout >>>> at >>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261) >>>> [storm-core-0.10.0.jar:0.10.0] >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> [?:1.8.0_40] >>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>> [?:1.8.0_40] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>> [?:1.8.0_40] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>> [?:1.8.0_40] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> [?:1.8.0_40] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> [?:1.8.0_40] >>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40] >>>> >>> >>> >>> This occurs randomly >>> I can't able to trace back to the problem >>> >>> Maybe if spout takes too long to process, then streamparse can't >>> acknowledge the heartbeat in time >>> >>> changed "supervisor.worker.timeout.secs" from 30 to 600 >>> >>> Still the topology breaks. Is there any other options here? >>> >>> >>> Thanks, >>> Ram. >>> >>> >>> >> >> >
