Do get_simple_consumer() blocks for receiving messages? If it is, can we
set timeout on this?

Btw, I found edge-case from ShellSpout (not sure if it represents your
case) :
When nextTuple is not calling at any chances (max spout pending,
backpressure introduced at 1.0.0, and so on) multi-lang spout don't receive
any message from ShellSpout thus no messages from multi-lang spout side, so
no heartbeat is being marked.

I'll file an issue and see if it's easy to fix.

Jungtaek Lim (HeartSaVioR)

2016년 6월 23일 (목) 오후 10:59, cogumelosmaravilha <[email protected]>님이
작성:

> Python Petrel Spout sample;
>
> from pykafka import KafkaClient
> from petrel import storm
> from petrel.emitter import Spout
>
> class MPDataSpoutInd(Spout):
>     """Topology Spout."""
>     def __init__(self):
>         self.kafclient = KafkaClient(hosts="x.x.x.x:9092")
>         super(MPDataSpoutInd, self).__init__(script=__file__)
>
>     @classmethod
>     def declareOutputFields(cls):
>         return ['x_id', 'y_id', 'z_id', 'date_stamp',
>                 'time_stamp', 'a', 'b', 'c', 'date_start',
>                 'time_start', 'users']
>
>     def initialize(self, conf, context):
>         topic = self.kafclient.topics['individual_data']
>         consumer = topic.get_simple_consumer()
>         consumer.reset_offsets()
>
>     def nextTuple(self):
>         topic = self.kafclient.topics['individual_data']
>         consumer = topic.get_simple_consumer()
>         for message in consumer:
>             if message is not None:
>                 x_id = message.value[0:24]
>                 y_id = message.value[25:49]
>                 z_id = message.value[50:74]
>                 date_stamp = message.value[75:85]
>                 time_stamp = message.value[86:98]
>                 a = message.value[99:102]
>                 b = message.value[103:106]
>                 c = message.value[107:110]
>                 start_date = message.value[111:121]
>                 start_time = message.value[122:134]
>                 users = message.value[135:138]
>                 datetime_str = date_stamp + ' ' + time_stamp
>                 datetime_start = start_date + ' ' + start_time
>
>                 if datetime_str > datetime_start:
>                     storm.emit([x_id, y_id, z_id, date_stamp,
>                                 time_stamp, str(a), str(b), str(c),
> start_date,
>                                 start_time, str(users))
>
> def run():
>     """Petrel must be."""
>     MPDataSpoutInd().run()
>
>
> On 23-06-2016 13:34, Jungtaek Lim wrote:
>
> Could you share implementation of spout?
> In multi-lang user level functions shouldn't block, so heartbeat timeout
> will occur if your spout requests to Kafka and wait for response infinitely.
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <[email protected]>
> 님이 작성:
>
>> Yes, but if you submit to Storm, it's ok, with or without an empty Kafka.
>>
>>
>> On 23-06-2016 10:40, ram kumar wrote:
>>
>> Yes, The same topology, when I run in single node cluster, it works. And
>> when there is no data to consume from kafka, it shows heartbeat timeout
>> error.
>>
>> Here, I am testing in multi node cluster and kafka server is in different
>> node
>>
>> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha <
>> [email protected]> wrote:
>>
>>> Hi,
>>>
>>> It happens to me to, but only when my kafka is empy.
>>> I'm using Petrel because the generated jar file is really small like
>>> 300k Petrel vs 16M Streamparse.
>>>
>>>
>>> On 23-06-2016 08:05, ram kumar wrote:
>>>
>>> Hi,
>>>
>>>
>>> *Version: *
>>>     Storm : 0.10.0
>>>     Streamparse : 2.1.4
>>>
>>>
>>> I am running a storm topology with a python streamparse "sparse run".
>>>
>>> This topology stops executing in the middle and throw an exception
>>>
>>>
>>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process:
>>>> ShellSpout died.
>>>> java.lang.RuntimeException: *subprocess heartbeat timeout*
>>>>     at
>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> [?:1.8.0_40]
>>>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> [?:1.8.0_40]
>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor -
>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>     at
>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> [?:1.8.0_40]
>>>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> [?:1.8.0_40]
>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>
>>>
>>>
>>> This occurs randomly
>>> I can't able to trace back to the problem
>>>
>>> Maybe if spout takes too long to process, then streamparse can't
>>> acknowledge the heartbeat in time
>>>
>>> changed "supervisor.worker.timeout.secs" from 30 to 600
>>>
>>> Still the topology breaks. Is there any other options here?
>>>
>>>
>>> Thanks,
>>> Ram.
>>>
>>>
>>>
>>
>>
>

Reply via email to