FYI - submit a pull request regarding STORM-1928:
https://github.com/apache/storm/pull/1526

It's against master branch, but since ShellSpout is not heavily modified
across minor versions so you can easily apply patch to Storm 0.10.x and
build your own storm-core package if you want to evaluate.

Hope this helps,

Thanks,
Jungtaek Lim (HeartSaVioR)


2016년 6월 24일 (금) 오전 12:04, Jungtaek Lim <[email protected]>님이 작성:

> FYI - Filed issue from myself here:
> https://issues.apache.org/jira/browse/STORM-1928
>
> 2016년 6월 23일 (목) 오후 11:33, Jungtaek Lim <[email protected]>님이 작성:
>
>> Do get_simple_consumer() blocks for receiving messages? If it is, can we
>> set timeout on this?
>>
>> Btw, I found edge-case from ShellSpout (not sure if it represents your
>> case) :
>> When nextTuple is not calling at any chances (max spout pending,
>> backpressure introduced at 1.0.0, and so on) multi-lang spout don't receive
>> any message from ShellSpout thus no messages from multi-lang spout side, so
>> no heartbeat is being marked.
>>
>> I'll file an issue and see if it's easy to fix.
>>
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2016년 6월 23일 (목) 오후 10:59, cogumelosmaravilha <[email protected]>님이
>> 작성:
>>
>>> Python Petrel Spout sample;
>>>
>>> from pykafka import KafkaClient
>>> from petrel import storm
>>> from petrel.emitter import Spout
>>>
>>> class MPDataSpoutInd(Spout):
>>>     """Topology Spout."""
>>>     def __init__(self):
>>>         self.kafclient = KafkaClient(hosts="x.x.x.x:9092")
>>>         super(MPDataSpoutInd, self).__init__(script=__file__)
>>>
>>>     @classmethod
>>>     def declareOutputFields(cls):
>>>         return ['x_id', 'y_id', 'z_id', 'date_stamp',
>>>                 'time_stamp', 'a', 'b', 'c', 'date_start',
>>>                 'time_start', 'users']
>>>
>>>     def initialize(self, conf, context):
>>>         topic = self.kafclient.topics['individual_data']
>>>         consumer = topic.get_simple_consumer()
>>>         consumer.reset_offsets()
>>>
>>>     def nextTuple(self):
>>>         topic = self.kafclient.topics['individual_data']
>>>         consumer = topic.get_simple_consumer()
>>>         for message in consumer:
>>>             if message is not None:
>>>                 x_id = message.value[0:24]
>>>                 y_id = message.value[25:49]
>>>                 z_id = message.value[50:74]
>>>                 date_stamp = message.value[75:85]
>>>                 time_stamp = message.value[86:98]
>>>                 a = message.value[99:102]
>>>                 b = message.value[103:106]
>>>                 c = message.value[107:110]
>>>                 start_date = message.value[111:121]
>>>                 start_time = message.value[122:134]
>>>                 users = message.value[135:138]
>>>                 datetime_str = date_stamp + ' ' + time_stamp
>>>                 datetime_start = start_date + ' ' + start_time
>>>
>>>                 if datetime_str > datetime_start:
>>>                     storm.emit([x_id, y_id, z_id, date_stamp,
>>>                                 time_stamp, str(a), str(b), str(c),
>>> start_date,
>>>                                 start_time, str(users))
>>>
>>> def run():
>>>     """Petrel must be."""
>>>     MPDataSpoutInd().run()
>>>
>>>
>>> On 23-06-2016 13:34, Jungtaek Lim wrote:
>>>
>>> Could you share implementation of spout?
>>> In multi-lang user level functions shouldn't block, so heartbeat timeout
>>> will occur if your spout requests to Kafka and wait for response infinitely.
>>>
>>> - Jungtaek Lim (HeartSaVioR)
>>>
>>> 2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <[email protected]>
>>> 님이 작성:
>>>
>>>> Yes, but if you submit to Storm, it's ok, with or without an empty
>>>> Kafka.
>>>>
>>>>
>>>> On 23-06-2016 10:40, ram kumar wrote:
>>>>
>>>> Yes, The same topology, when I run in single node cluster, it works.
>>>> And when there is no data to consume from kafka, it shows heartbeat timeout
>>>> error.
>>>>
>>>> Here, I am testing in multi node cluster and kafka server is in
>>>> different node
>>>>
>>>> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> It happens to me to, but only when my kafka is empy.
>>>>> I'm using Petrel because the generated jar file is really small like
>>>>> 300k Petrel vs 16M Streamparse.
>>>>>
>>>>>
>>>>> On 23-06-2016 08:05, ram kumar wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> *Version: *
>>>>>     Storm : 0.10.0
>>>>>     Streamparse : 2.1.4
>>>>>
>>>>>
>>>>> I am running a storm topology with a python streamparse "sparse run".
>>>>>
>>>>> This topology stops executing in the middle and throw an exception
>>>>>
>>>>>
>>>>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process:
>>>>>> ShellSpout died.
>>>>>> java.lang.RuntimeException: *subprocess heartbeat timeout*
>>>>>>     at
>>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>>>     at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>> [?:1.8.0_40]
>>>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor -
>>>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>>>     at
>>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>>>     at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>> [?:1.8.0_40]
>>>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>>>
>>>>>
>>>>>
>>>>> This occurs randomly
>>>>> I can't able to trace back to the problem
>>>>>
>>>>> Maybe if spout takes too long to process, then streamparse can't
>>>>> acknowledge the heartbeat in time
>>>>>
>>>>> changed "supervisor.worker.timeout.secs" from 30 to 600
>>>>>
>>>>> Still the topology breaks. Is there any other options here?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Ram.
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>

Reply via email to