Python Petrel Spout sample;

from pykafka import KafkaClient
from petrel import storm
from petrel.emitter import Spout

class MPDataSpoutInd(Spout):
    """Topology Spout."""
    def __init__(self):
        self.kafclient = KafkaClient(hosts="x.x.x.x:9092")
        super(MPDataSpoutInd, self).__init__(script=__file__)

    @classmethod
    def declareOutputFields(cls):
        return ['x_id', 'y_id', 'z_id', 'date_stamp',
                'time_stamp', 'a', 'b', 'c', 'date_start',
                'time_start', 'users']

    def initialize(self, conf, context):
        topic = self.kafclient.topics['individual_data']
        consumer = topic.get_simple_consumer()
        consumer.reset_offsets()

    def nextTuple(self):
        topic = self.kafclient.topics['individual_data']
        consumer = topic.get_simple_consumer()
        for message in consumer:
            if message is not None:
                x_id = message.value[0:24]
                y_id = message.value[25:49]
                z_id = message.value[50:74]
                date_stamp = message.value[75:85]
                time_stamp = message.value[86:98]
                a = message.value[99:102]
                b = message.value[103:106]
                c = message.value[107:110]
                start_date = message.value[111:121]
                start_time = message.value[122:134]
                users = message.value[135:138]
                datetime_str = date_stamp + ' ' + time_stamp
                datetime_start = start_date + ' ' + start_time

                if datetime_str > datetime_start:
                    storm.emit([x_id, y_id, z_id, date_stamp,
time_stamp, str(a), str(b), str(c), start_date,
                                start_time, str(users))

def run():
    """Petrel must be."""
    MPDataSpoutInd().run()

On 23-06-2016 13:34, Jungtaek Lim wrote:
Could you share implementation of spout?
In multi-lang user level functions shouldn't block, so heartbeat timeout will occur if your spout requests to Kafka and wait for response infinitely.

- Jungtaek Lim (HeartSaVioR)

2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <[email protected] <mailto:[email protected]>> 님이 작성:

    Yes, but if you submit to Storm, it's ok, with or without an empty
    Kafka.


    On 23-06-2016 10:40, ram kumar wrote:
    Yes, The same topology, when I run in single node cluster, it
    works. And when there is no data to consume from kafka, it shows
    heartbeat timeout error.

    Here, I am testing in multi node cluster and kafka server is in
    different node

    On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha
    <[email protected] <mailto:[email protected]>>
    wrote:

        Hi,

        It happens to me to, but only when my kafka is empy.
        I'm using Petrel because the generated jar file is really
        small like 300k Petrel vs 16M Streamparse.


        On 23-06-2016 08:05, ram kumar wrote:
        Hi,
        *
        *
        *Version:
        *
            Storm : 0.10.0
            Streamparse : 2.1.4


        I am running a storm topology with a python streamparse
        "sparse run".
        This topology stops executing in the middle and throw an
        exception

            158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout -
            Halting process: ShellSpout died.
            java.lang.RuntimeException: *subprocess heartbeat timeout*
                at
            
backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
            [storm-core-0.10.0.jar:0.10.0]
                at
            
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            [?:1.8.0_40]
                at
            java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            [?:1.8.0_40]
                at
            
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            [?:1.8.0_40]
                at
            
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            [?:1.8.0_40]
                at
            
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            [?:1.8.0_40]
                at
            
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            [?:1.8.0_40]
                at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
            158036 [pool-37-thread-1] ERROR b.s.d.executor -
            java.lang.RuntimeException: subprocess heartbeat timeout
                at
            
backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
            [storm-core-0.10.0.jar:0.10.0]
                at
            
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            [?:1.8.0_40]
                at
            java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            [?:1.8.0_40]
                at
            
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            [?:1.8.0_40]
                at
            
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            [?:1.8.0_40]
                at
            
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            [?:1.8.0_40]
                at
            
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            [?:1.8.0_40]
                at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]



        This occurs randomly
        I can't able to trace back to the problem

        Maybe if spout takes too long to process, then streamparse
        can't acknowledge the heartbeat in time

        changed "supervisor.worker.timeout.secs" from 30 to 600

        Still the topology breaks. Is there any other options here?


        Thanks,
        Ram.




Reply via email to