Python Petrel Spout sample;
from pykafka import KafkaClient
from petrel import storm
from petrel.emitter import Spout
class MPDataSpoutInd(Spout):
"""Topology Spout."""
def __init__(self):
self.kafclient = KafkaClient(hosts="x.x.x.x:9092")
super(MPDataSpoutInd, self).__init__(script=__file__)
@classmethod
def declareOutputFields(cls):
return ['x_id', 'y_id', 'z_id', 'date_stamp',
'time_stamp', 'a', 'b', 'c', 'date_start',
'time_start', 'users']
def initialize(self, conf, context):
topic = self.kafclient.topics['individual_data']
consumer = topic.get_simple_consumer()
consumer.reset_offsets()
def nextTuple(self):
topic = self.kafclient.topics['individual_data']
consumer = topic.get_simple_consumer()
for message in consumer:
if message is not None:
x_id = message.value[0:24]
y_id = message.value[25:49]
z_id = message.value[50:74]
date_stamp = message.value[75:85]
time_stamp = message.value[86:98]
a = message.value[99:102]
b = message.value[103:106]
c = message.value[107:110]
start_date = message.value[111:121]
start_time = message.value[122:134]
users = message.value[135:138]
datetime_str = date_stamp + ' ' + time_stamp
datetime_start = start_date + ' ' + start_time
if datetime_str > datetime_start:
storm.emit([x_id, y_id, z_id, date_stamp,
time_stamp, str(a), str(b), str(c),
start_date,
start_time, str(users))
def run():
"""Petrel must be."""
MPDataSpoutInd().run()
On 23-06-2016 13:34, Jungtaek Lim wrote:
Could you share implementation of spout?
In multi-lang user level functions shouldn't block, so heartbeat
timeout will occur if your spout requests to Kafka and wait for
response infinitely.
- Jungtaek Lim (HeartSaVioR)
2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha
<[email protected] <mailto:[email protected]>> 님이
작성:
Yes, but if you submit to Storm, it's ok, with or without an empty
Kafka.
On 23-06-2016 10:40, ram kumar wrote:
Yes, The same topology, when I run in single node cluster, it
works. And when there is no data to consume from kafka, it shows
heartbeat timeout error.
Here, I am testing in multi node cluster and kafka server is in
different node
On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha
<[email protected] <mailto:[email protected]>>
wrote:
Hi,
It happens to me to, but only when my kafka is empy.
I'm using Petrel because the generated jar file is really
small like 300k Petrel vs 16M Streamparse.
On 23-06-2016 08:05, ram kumar wrote:
Hi,
*
*
*Version:
*
Storm : 0.10.0
Streamparse : 2.1.4
I am running a storm topology with a python streamparse
"sparse run".
This topology stops executing in the middle and throw an
exception
158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout -
Halting process: ShellSpout died.
java.lang.RuntimeException: *subprocess heartbeat timeout*
at
backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
[storm-core-0.10.0.jar:0.10.0]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_40]
at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[?:1.8.0_40]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_40]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[?:1.8.0_40]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[?:1.8.0_40]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[?:1.8.0_40]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
158036 [pool-37-thread-1] ERROR b.s.d.executor -
java.lang.RuntimeException: subprocess heartbeat timeout
at
backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
[storm-core-0.10.0.jar:0.10.0]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_40]
at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[?:1.8.0_40]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_40]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[?:1.8.0_40]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[?:1.8.0_40]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[?:1.8.0_40]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
This occurs randomly
I can't able to trace back to the problem
Maybe if spout takes too long to process, then streamparse
can't acknowledge the heartbeat in time
changed "supervisor.worker.timeout.secs" from 30 to 600
Still the topology breaks. Is there any other options here?
Thanks,
Ram.