FYI - Filed issue from myself here:
https://issues.apache.org/jira/browse/STORM-1928

2016년 6월 23일 (목) 오후 11:33, Jungtaek Lim <[email protected]>님이 작성:

> Do get_simple_consumer() blocks for receiving messages? If it is, can we
> set timeout on this?
>
> Btw, I found edge-case from ShellSpout (not sure if it represents your
> case) :
> When nextTuple is not calling at any chances (max spout pending,
> backpressure introduced at 1.0.0, and so on) multi-lang spout don't receive
> any message from ShellSpout thus no messages from multi-lang spout side, so
> no heartbeat is being marked.
>
> I'll file an issue and see if it's easy to fix.
>
> Jungtaek Lim (HeartSaVioR)
>
> 2016년 6월 23일 (목) 오후 10:59, cogumelosmaravilha <[email protected]>님이
> 작성:
>
>> Python Petrel Spout sample;
>>
>> from pykafka import KafkaClient
>> from petrel import storm
>> from petrel.emitter import Spout
>>
>> class MPDataSpoutInd(Spout):
>>     """Topology Spout."""
>>     def __init__(self):
>>         self.kafclient = KafkaClient(hosts="x.x.x.x:9092")
>>         super(MPDataSpoutInd, self).__init__(script=__file__)
>>
>>     @classmethod
>>     def declareOutputFields(cls):
>>         return ['x_id', 'y_id', 'z_id', 'date_stamp',
>>                 'time_stamp', 'a', 'b', 'c', 'date_start',
>>                 'time_start', 'users']
>>
>>     def initialize(self, conf, context):
>>         topic = self.kafclient.topics['individual_data']
>>         consumer = topic.get_simple_consumer()
>>         consumer.reset_offsets()
>>
>>     def nextTuple(self):
>>         topic = self.kafclient.topics['individual_data']
>>         consumer = topic.get_simple_consumer()
>>         for message in consumer:
>>             if message is not None:
>>                 x_id = message.value[0:24]
>>                 y_id = message.value[25:49]
>>                 z_id = message.value[50:74]
>>                 date_stamp = message.value[75:85]
>>                 time_stamp = message.value[86:98]
>>                 a = message.value[99:102]
>>                 b = message.value[103:106]
>>                 c = message.value[107:110]
>>                 start_date = message.value[111:121]
>>                 start_time = message.value[122:134]
>>                 users = message.value[135:138]
>>                 datetime_str = date_stamp + ' ' + time_stamp
>>                 datetime_start = start_date + ' ' + start_time
>>
>>                 if datetime_str > datetime_start:
>>                     storm.emit([x_id, y_id, z_id, date_stamp,
>>                                 time_stamp, str(a), str(b), str(c),
>> start_date,
>>                                 start_time, str(users))
>>
>> def run():
>>     """Petrel must be."""
>>     MPDataSpoutInd().run()
>>
>>
>> On 23-06-2016 13:34, Jungtaek Lim wrote:
>>
>> Could you share implementation of spout?
>> In multi-lang user level functions shouldn't block, so heartbeat timeout
>> will occur if your spout requests to Kafka and wait for response infinitely.
>>
>> - Jungtaek Lim (HeartSaVioR)
>>
>> 2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <[email protected]>
>> 님이 작성:
>>
>>> Yes, but if you submit to Storm, it's ok, with or without an empty Kafka.
>>>
>>>
>>> On 23-06-2016 10:40, ram kumar wrote:
>>>
>>> Yes, The same topology, when I run in single node cluster, it works. And
>>> when there is no data to consume from kafka, it shows heartbeat timeout
>>> error.
>>>
>>> Here, I am testing in multi node cluster and kafka server is in
>>> different node
>>>
>>> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha <
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> It happens to me to, but only when my kafka is empy.
>>>> I'm using Petrel because the generated jar file is really small like
>>>> 300k Petrel vs 16M Streamparse.
>>>>
>>>>
>>>> On 23-06-2016 08:05, ram kumar wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>> *Version: *
>>>>     Storm : 0.10.0
>>>>     Streamparse : 2.1.4
>>>>
>>>>
>>>> I am running a storm topology with a python streamparse "sparse run".
>>>>
>>>> This topology stops executing in the middle and throw an exception
>>>>
>>>>
>>>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process:
>>>>> ShellSpout died.
>>>>> java.lang.RuntimeException: *subprocess heartbeat timeout*
>>>>>     at
>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>>     at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> [?:1.8.0_40]
>>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor -
>>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>>     at
>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>>     at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> [?:1.8.0_40]
>>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>>
>>>>
>>>>
>>>> This occurs randomly
>>>> I can't able to trace back to the problem
>>>>
>>>> Maybe if spout takes too long to process, then streamparse can't
>>>> acknowledge the heartbeat in time
>>>>
>>>> changed "supervisor.worker.timeout.secs" from 30 to 600
>>>>
>>>> Still the topology breaks. Is there any other options here?
>>>>
>>>>
>>>> Thanks,
>>>> Ram.
>>>>
>>>>
>>>>
>>>
>>>
>>

Reply via email to