FYI - Filed issue from myself here: https://issues.apache.org/jira/browse/STORM-1928
2016년 6월 23일 (목) 오후 11:33, Jungtaek Lim <[email protected]>님이 작성: > Do get_simple_consumer() blocks for receiving messages? If it is, can we > set timeout on this? > > Btw, I found edge-case from ShellSpout (not sure if it represents your > case) : > When nextTuple is not calling at any chances (max spout pending, > backpressure introduced at 1.0.0, and so on) multi-lang spout don't receive > any message from ShellSpout thus no messages from multi-lang spout side, so > no heartbeat is being marked. > > I'll file an issue and see if it's easy to fix. > > Jungtaek Lim (HeartSaVioR) > > 2016년 6월 23일 (목) 오후 10:59, cogumelosmaravilha <[email protected]>님이 > 작성: > >> Python Petrel Spout sample; >> >> from pykafka import KafkaClient >> from petrel import storm >> from petrel.emitter import Spout >> >> class MPDataSpoutInd(Spout): >> """Topology Spout.""" >> def __init__(self): >> self.kafclient = KafkaClient(hosts="x.x.x.x:9092") >> super(MPDataSpoutInd, self).__init__(script=__file__) >> >> @classmethod >> def declareOutputFields(cls): >> return ['x_id', 'y_id', 'z_id', 'date_stamp', >> 'time_stamp', 'a', 'b', 'c', 'date_start', >> 'time_start', 'users'] >> >> def initialize(self, conf, context): >> topic = self.kafclient.topics['individual_data'] >> consumer = topic.get_simple_consumer() >> consumer.reset_offsets() >> >> def nextTuple(self): >> topic = self.kafclient.topics['individual_data'] >> consumer = topic.get_simple_consumer() >> for message in consumer: >> if message is not None: >> x_id = message.value[0:24] >> y_id = message.value[25:49] >> z_id = message.value[50:74] >> date_stamp = message.value[75:85] >> time_stamp = message.value[86:98] >> a = message.value[99:102] >> b = message.value[103:106] >> c = message.value[107:110] >> start_date = message.value[111:121] >> start_time = message.value[122:134] >> users = message.value[135:138] >> datetime_str = date_stamp + ' ' + time_stamp >> datetime_start = start_date + ' ' + start_time >> >> if datetime_str > datetime_start: >> storm.emit([x_id, y_id, z_id, date_stamp, >> time_stamp, str(a), str(b), str(c), >> start_date, >> start_time, str(users)) >> >> def run(): >> """Petrel must be.""" >> MPDataSpoutInd().run() >> >> >> On 23-06-2016 13:34, Jungtaek Lim wrote: >> >> Could you share implementation of spout? >> In multi-lang user level functions shouldn't block, so heartbeat timeout >> will occur if your spout requests to Kafka and wait for response infinitely. >> >> - Jungtaek Lim (HeartSaVioR) >> >> 2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <[email protected]> >> 님이 작성: >> >>> Yes, but if you submit to Storm, it's ok, with or without an empty Kafka. >>> >>> >>> On 23-06-2016 10:40, ram kumar wrote: >>> >>> Yes, The same topology, when I run in single node cluster, it works. And >>> when there is no data to consume from kafka, it shows heartbeat timeout >>> error. >>> >>> Here, I am testing in multi node cluster and kafka server is in >>> different node >>> >>> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha < >>> [email protected]> wrote: >>> >>>> Hi, >>>> >>>> It happens to me to, but only when my kafka is empy. >>>> I'm using Petrel because the generated jar file is really small like >>>> 300k Petrel vs 16M Streamparse. >>>> >>>> >>>> On 23-06-2016 08:05, ram kumar wrote: >>>> >>>> Hi, >>>> >>>> >>>> *Version: * >>>> Storm : 0.10.0 >>>> Streamparse : 2.1.4 >>>> >>>> >>>> I am running a storm topology with a python streamparse "sparse run". >>>> >>>> This topology stops executing in the middle and throw an exception >>>> >>>> >>>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process: >>>>> ShellSpout died. >>>>> java.lang.RuntimeException: *subprocess heartbeat timeout* >>>>> at >>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261) >>>>> [storm-core-0.10.0.jar:0.10.0] >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>> [?:1.8.0_40] >>>>> at >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>>> [?:1.8.0_40] >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>>> [?:1.8.0_40] >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>>> [?:1.8.0_40] >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> [?:1.8.0_40] >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> [?:1.8.0_40] >>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40] >>>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor - >>>>> java.lang.RuntimeException: subprocess heartbeat timeout >>>>> at >>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261) >>>>> [storm-core-0.10.0.jar:0.10.0] >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>>> [?:1.8.0_40] >>>>> at >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>>>> [?:1.8.0_40] >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>>>> [?:1.8.0_40] >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>>>> [?:1.8.0_40] >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> [?:1.8.0_40] >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> [?:1.8.0_40] >>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40] >>>>> >>>> >>>> >>>> This occurs randomly >>>> I can't able to trace back to the problem >>>> >>>> Maybe if spout takes too long to process, then streamparse can't >>>> acknowledge the heartbeat in time >>>> >>>> changed "supervisor.worker.timeout.secs" from 30 to 600 >>>> >>>> Still the topology breaks. Is there any other options here? >>>> >>>> >>>> Thanks, >>>> Ram. >>>> >>>> >>>> >>> >>> >>
