Re: Native Streaming Window API

2016-09-14 Thread Satish Duggana
Hi,

You can look at the example code at
https://raw.githubusercontent.com/apache/storm/master/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java
and for trident at
https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java

Below link covers windowing and state checkpointing in Storm in detail with
some examples.

https://community.hortonworks.com/articles/14171/windowing-and-state-checkpointing-in-apache-storm.html


Thanks,
Satish.

On Thu, Sep 15, 2016 at 6:20 AM, Koji Ishida  wrote:

> Hi T.I.
>
> >Any chance you want to make it helpful for the entire world, rather than
> just my Japanese friends ;) ?
>
> OK.
>
> Now my peers and I started to translate
> my post for the entire world, so wait a minute :)
>
> Thanks,
> Koji.
>
> 2016年9月14日(水) 13:58 Tech Id :
>
>> Thank you Koji for the links.
>>
>> We do not understand Japanese :( but it seems your blog post is really
>> useful (just looking at the code and logs).
>> Any chance you want to make it helpful for the entire world, rather than
>> just my Japanese friends ;) ?
>>
>> On Tue, Sep 13, 2016 at 6:21 PM, Koji Ishida  wrote:
>>
>>> Hello, T.I.
>>>
>>> Documentation for Native Streaming API exists in Apache Storm page as
>>> Windowing now.
>>>
>>> http://storm.apache.org/releases/1.0.2/Windowing.html
>>>
>>> You can understand the specification for Native Streaming API.
>>>
>>>
>>> If you or your peers can understand Japanese, you can read my post. I
>>> wrote explanation for Native Streaming API basic examples.
>>>
>>> http://qiita.com/kojiisd/items/1618cf8d79bb5ed995d3
>>>
>>> Regards,
>>> Koji Ishida
>>>
>>>
>>> 2016年9月14日水曜日、Tech Idさんは書きました:
>>>
>>> Hi,

 I want to use the Native Streaming Window API introduced in Storm 1.0.0

 Is there some documentation on the same along with some high level
 design or an example?

 Thanks
 T.I.


>>


Re: Native Streaming Window API

2016-09-14 Thread Koji Ishida
Hi T.I.

>Any chance you want to make it helpful for the entire world, rather than
just my Japanese friends ;) ?

OK.

Now my peers and I started to translate
my post for the entire world, so wait a minute :)

Thanks,
Koji.

2016年9月14日(水) 13:58 Tech Id :

> Thank you Koji for the links.
>
> We do not understand Japanese :( but it seems your blog post is really
> useful (just looking at the code and logs).
> Any chance you want to make it helpful for the entire world, rather than
> just my Japanese friends ;) ?
>
> On Tue, Sep 13, 2016 at 6:21 PM, Koji Ishida  wrote:
>
>> Hello, T.I.
>>
>> Documentation for Native Streaming API exists in Apache Storm page as
>> Windowing now.
>>
>> http://storm.apache.org/releases/1.0.2/Windowing.html
>>
>> You can understand the specification for Native Streaming API.
>>
>>
>> If you or your peers can understand Japanese, you can read my post. I
>> wrote explanation for Native Streaming API basic examples.
>>
>> http://qiita.com/kojiisd/items/1618cf8d79bb5ed995d3
>>
>> Regards,
>> Koji Ishida
>>
>>
>> 2016年9月14日水曜日、Tech Idさんは書きました:
>>
>> Hi,
>>>
>>> I want to use the Native Streaming Window API introduced in Storm 1.0.0
>>>
>>> Is there some documentation on the same along with some high level
>>> design or an example?
>>>
>>> Thanks
>>> T.I.
>>>
>>>
>


[ANNOUNCE] Apache Storm 0.10.2 Released

2016-09-14 Thread P. Taylor Goetz
The Apache Storm community is pleased to announce the release of Apache Storm 
version 0.10.2.

Storm is a distributed, fault-tolerant, and high-performance realtime 
computation system that provides strong guarantees on the processing of data. 
You can read more about Storm on the project website:

http://storm.apache.org

Downloads of source and binary distributions are listed in our download
section:

http://storm.apache.org/downloads.html

You can read more about this release in the following blog post:

https://storm.apache.org/2016/09/14/storm0102-released.html

Distribution artifacts are available in Maven Central at the following 
coordinates:

groupId: org.apache.storm
artifactId: storm-core
version: 0.10.2

The full list of changes is available here[1]. Please let us know [2] if you 
encounter any problems.

Regards,

The Apache Storm Team

[1]: https://github.com/apache/storm/blob/v0.10.2/CHANGELOG.md
[2]: https://issues.apache.org/jira/browse/STORM


signature.asc
Description: Message signed with OpenPGP using GPGMail


batch processing in storm

2016-09-14 Thread Shushant Arora
Hi

Is storm a right fit for batch processing - say every 10 min data from
kafka source.
does we need to right separate code for storm stream vs batch application ?

Thanks


Re: Re: How will storm replay the tuple tree?

2016-09-14 Thread S G
Very nice discussion !

I have also been wanting to see a feature something similar to Ravi's
comment above:

"*There is one thing i am looking forward from Storm is to inform Spout
about what kind of failure it was*. i.e. if it was ConnectionTimeout or
ReadTimeout etc, that means if i retry it may pass. But say it was null
pointer exception(java world) , i know the data which is being expected is
not there and my code is not handling that scenario, so either i will have
to change code or ask data provider to send that field, but retry wont help
me."


I think we need

1) Add a new method *failWithoutRetry(Tuple, Exception)* in the collector.
2) Provide the ability to *configure a dead-letter data-store in the spout*
for failed messages reported by #1 above.

The configurable data-store should support kafka, solr and redis to
begin-with (Plus the option to implement one's own and dropping a jar file
in the classpath).

Such a feature would benefit all the spouts.

Benefits:
1) Topologies will not block replaying the same doomed-to-fail tuples.
2) Users can set alerts on dead-letters and find out easily actual problems
in their topologies rather than analyze all failed tuples only to find that
they failed because of a temporary network glitch.
3) Since the entire Tuple is put into the dead-letter, all the data is
available for retrying after fixing the topology code.

Thx,
SG



On Wed, Sep 14, 2016 at 7:25 AM, Hart, James W.  wrote:

> In my testing when a tuple was replayed by a spout, every kafka message
> from the replayed one to the end was replayed.  That’s why all bolts need
> to be idempotent so that replays do not cause work to be done twice.  I
> think it has to do with kafka tracking the offset of the last acked message
> in a topic, not the actual ack of every message individually.  This is a
> simplistic view as it’s a lot more complicated than this.
>
>
>
> If anybody can confirm this, please respond as it was a surprise to me and
> cause me a couple of days of testing when I encountered it.
>
>
>
>
>
> *From:* fanxi...@travelsky.com [mailto:fanxi...@travelsky.com]
> *Sent:* Tuesday, September 13, 2016 9:22 PM
>
> *To:* user
> *Subject:* Re: Re: How will storm replay the tuple tree?
>
>
>
> Yes, only the failed tuple are replayed, but the whole batch will be held.
>
>
>
> So, if the tuple failed forever, the batch will be held forever?
>
>
>
> I am just not clear  the tuple itself or the batch which owns the tuple
> will be held in spout.
>
>
>
>
> --
>
> Josh
>
>
>
>
>
>
>
> *From:* Ambud Sharma 
>
> *Date:* 2016-09-14 09:10
>
> *To:* user 
>
> *Subject:* Re: Re: How will storm replay the tuple tree?
>
> No as per the code only individual messages are replayed.
>
>
>
> On Sep 13, 2016 6:09 PM, "fanxi...@travelsky.com" 
> wrote:
>
> Hi:
>
>
>
> I'd like to make clear on something about Kafka-spout referring to ack.
>
>
>
> For example, kafka-spout fetches offset 5000-6000 from Kafka server, but
> one tuple whose offset is 5101 is failed by a bolt, the whole batch of
> 5000-6000 will be remain in kafka-spout until the 5101 tuple will be acked.
> If the 5101 tuple can not be acked for a long time, the batch 5000-6000
> will remain for a long time, and the kafka-spout will stop to fetch data
> from kafka in these time.
>
>
>
> Am I right?
>
>
>
>
> --
>
> Josh
>
>
>
>
>
>
>
> *From:* Tech Id 
>
> *Date:* 2016-09-14 06:26
>
> *To:* user 
>
> *Subject:* Re: How will storm replay the tuple tree?
>
> I agree with this statement about code/architecture but in case of some
> system outages, like one of the end-points (Solr, Couchbase, Elastic-Search
> etc.) being down temporarily, a very large number of other fully-functional
> and healthy systems will receive a large number of duplicate replays
> (especially in heavy throughput topologies).
>
>
>
> If you can elaborate a little more on the performance cost of tracking
> tuples or point to a document reflecting the same, that will be of great
> help.
>
>
>
> Best,
>
> T.I.
>
>
>
> On Tue, Sep 13, 2016 at 12:26 PM, Hart, James W.  wrote:
>
> Failures should be very infrequent, if they are not then rethink the code
> and architecture.  The performance cost of tracking tuples in the way that
> would be required to replay at the failure is large, basically that method
> would slow everything way down for very infrequent failures.
>
>
>
> *From:* S G [mailto:sg.online.em...@gmail.com]
> *Sent:* Tuesday, September 13, 2016 3:17 PM
> *To:* user@storm.apache.org
> *Subject:* Re: How will storm replay the tuple tree?
>
>
>
> Hi,
>
>
>
> I am a little curious to know why we begin at the spout level for case 1.
>
> If we replay at the failing bolt's parent level (BoltA in this case), then
> it should be more performant due to a decrease in duplicate processing (as
> compared to whole tuple tree replays).
>
>
>
> If BoltA crashes due to some reason while replaying, only then the Spout
> should receive this as

Kafka spout not working

2016-09-14 Thread Paritosh Anand
Hi,

I am not able to submit a topology that reads messages from kafka. Below is
the error trace

Storm version - 1.0.2
Kafka version - 0.10.0.0

17286 [SyncThread:0] ERROR o.a.s.s.o.a.z.s.NIOServerCnxn - Unexpected
Exception:
java.nio.channels.CancelledKeyException
at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
~[?:1.8.0_102]
at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77)
~[?:1.8.0_102]
at org.apache.storm.shade.org.apache.zookeeper.server.
NIOServerCnxn.sendBuffer(NIOServerCnxn.java:151)
[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.shade.org.apache.zookeeper.server.
NIOServerCnxn.sendResponse(NIOServerCnxn.java:1081)
[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.shade.org.apache.zookeeper.server.FinalRequestProcessor.
processRequest(FinalRequestProcessor.java:404) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.shade.org.apache.zookeeper.server.
SyncRequestProcessor.flush(SyncRequestProcessor.java:200)
[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.shade.org.apache.zookeeper.server.
SyncRequestProcessor.run(SyncRequestProcessor.java:131)
[storm-core-1.0.2.jar:1.0.2]

Can anyone point what can be the issue here.


Re: storm-kafka-client 1.1.0 tag - Multiple Spouts Eventually Freeze

2016-09-14 Thread James Tyack
Zach,
We have been seeing a similar issue with storm kafka since upgrade and
posted about it earlier this week. We produced tests and opened pull
request for feedback on what we saw.
https://github.com/apache/storm/pull/1679
Regards
James
On Sep 12, 2016 12:03 PM, "Zach Schoenberger"  wrote:

> Hi all,
>
> I am working with the storm-kafka-client and am seeing some odd behavior.
> Whenever I setup a topology with the storm-kafka-client with 2 tasks, 2
> workers, and parallelism hint of 2, I see that the spout will eventually
> only poll 0 events from kafka, even when I have verified that there are new
> events flowing in. The number of partitions is 180 for the topic. The kafka
> version I am using is 0.10.1.
>
> Any help/insight would be greatly appreciated.
>
> Thanks,
> Zach
>


Re: Fwd: Serialize error when storm supervisor starting executor

2016-09-14 Thread Jianfei Li
Hi, Ambud,

Thanks for your reply.

I'm not using Maven to build my project. I tried excluding the conflict
classes when packaging the jar, but the worker in supervisor then complains
about NoClassDefException, though the class exists in one of the jars in
the classpath of the worker.

On Wed, Sep 14, 2016 at 7:04 AM, Ambud Sharma 
wrote:

> Can you post the snippet of your pom.xml file especially around where
> storm-core is imported?
>
> I suspect you are not excluding dependencies explicitly if there is a
> conflict in maven .
>
> What is serialized is your bolt instance so you need either have
> serialization objects or mark them transient and instantiate them in the
> prepare method.
>
> Thanks,
> Ambid
>
> On Sep 12, 2016 11:39 AM, "Jianfei Li"  wrote:
>
>>
>> Hi,
>>
>> I got a InvalidClassException in the supervior after uploading the jars
>> to storm using the *"storm jar"* command. The exception is as below:
>>
>> 2016-09-06 21:49:32.649 org.apache.storm.daemon.executor [INFO][main] 
>> Loading executor __acker:[8 8]
>> 2016-09-06 21:49:32.661 org.apache.storm.daemon.executor [INFO][main] Loaded 
>> executor tasks __acker:[8 8]
>> 2016-09-06 21:49:32.672 org.apache.storm.daemon.executor [INFO][main] 
>> Timeouts disabled for executor __acker:[8 8]
>> 2016-09-06 21:49:32.673 org.apache.storm.daemon.executor [INFO][main] 
>> Finished loading executor __acker:[8 8]
>> 2016-09-06 21:49:32.688 org.apache.storm.daemon.executor [INFO][main] 
>> Loading executor EventDistributionBolt:[2 2]
>> 2016-09-06 21:49:32.711 org.apache.storm.daemon.executor [INFO][main] Loaded 
>> executor tasks EventDistributionBolt:[2 2]
>> 2016-09-06 21:49:32.713 org.apache.storm.daemon.executor [INFO][main] 
>> Finished loading executor EventDistributionBolt:[2 2]
>> 2016-09-06 21:49:32.728 org.apache.storm.daemon.executor [INFO][main] 
>> Loading executor SubscriptionFinderBolt:[6 6]
>> 2016-09-06 21:49:32.735 org.apache.storm.daemon.executor [INFO][main] Loaded 
>> executor tasks SubscriptionFinderBolt:[6 6]
>> 2016-09-06 21:49:32.742 org.apache.storm.daemon.executor [INFO][main] 
>> Finished loading executor SubscriptionFinderBolt:[6 6]
>> 2016-09-06 21:49:32.753 org.apache.storm.daemon.executor [INFO][main] 
>> Loading executor __system:[-1 -1]
>> 2016-09-06 21:49:32.755 org.apache.storm.daemon.executor [INFO][main] Loaded 
>> executor tasks __system:[-1 -1]
>> 2016-09-06 21:49:32.757 org.apache.storm.daemon.executor [INFO][main] 
>> Finished loading executor __system:[-1 -1]
>> 2016-09-06 21:49:32.768 org.apache.storm.daemon.executor [INFO][main] 
>> Loading executor 
>> __metricscom.xxx.alp.storm.metric.ServiceLogMetricsConsumer:[10 10]
>> 2016-09-06 21:49:32.771 org.apache.storm.daemon.executor [INFO][main] Loaded 
>> executor tasks 
>> __metricscom..alp.storm.metric.ServiceLogMetricsConsumer:[10 10]
>> 2016-09-06 21:49:32.777 org.apache.storm.daemon.executor [INFO][main] 
>> Finished loading executor 
>> __metricscom..alp.storm.metric.ServiceLogMetricsConsumer:[10 10]
>> 2016-09-06 21:49:32.789 org.apache.storm.daemon.executor [INFO][main] 
>> Loading executor EventStreamSpout:[4 4]
>> 2016-09-06 21:49:32.807 org.apache.storm.messaging.netty.Client 
>> [ERROR][client-boss-1] connection attempt 2 to 
>> Netty-Client-alp-storm-supervisor-2a-i-f478ab2f..com/:6700 
>> failed: java.net.ConnectException: Connection refused: 
>> alp-storm-supervisor-2a-i-f478ab2f..com/:6700
>> 2016-09-06 
>>  
>> 21:49:32.936 org.apache.storm.daemon.worker [ERROR][main] Error on 
>> initialization of server mk-workerjava.lang.RuntimeException: 
>> java.io.InvalidClassException: 
>> com.fasterxml.jackson.databind.deser.BasicDeserializerFactory; local class 
>> incompatible: stream classdesc serialVersionUID = -1426550576764902820, 
>> local class serialVersionUID = 2445376702910286321
>> at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:179) 
>> ~[storm-core-1.0.x.jar:?]
>> at 
>> org.apache.storm.utils.Utils.getSetComponentObject(Utils.java:430) 
>> ~[storm-core-1.0.x.jar:?]
>> at org.apache.storm.daemon.task$get_task_object.invoke(task.clj:74) 
>> ~[storm-core-1.0.x.jar:?]
>> at 
>> org.apache.storm.daemon.task$mk_task_data$fn__5625.invoke(task.clj:177) 
>> ~[storm-core-1.0.x.jar:?]
>> at org.apache.storm.util$assoc_apply_self.invoke(util.clj:930) 
>> ~[storm-core-1.0.x.jar:?]
>> at org.apache.storm.daemon.task$mk_task_data.invoke(task.clj:170) 
>> ~[storm-core-1.0.x.jar:?]
>> at ..
>>
>> From the exception, I figured out it was because the version of
>> jackson-databind classes in the uploaded jar was not the same as the
>> jackson-databind the supervisor uses. Then I make them the same and
>> re-upload the jars again. The executor at the supervisor node could be
>> started without issue this time.
>>
>> Based on this issue, my questions are:
>>
>>1. Do I have to mak

RE: Re: How will storm replay the tuple tree?

2016-09-14 Thread Hart, James W.
In my testing when a tuple was replayed by a spout, every kafka message from 
the replayed one to the end was replayed.  That’s why all bolts need to be 
idempotent so that replays do not cause work to be done twice.  I think it has 
to do with kafka tracking the offset of the last acked message in a topic, not 
the actual ack of every message individually.  This is a simplistic view as 
it’s a lot more complicated than this.

If anybody can confirm this, please respond as it was a surprise to me and 
cause me a couple of days of testing when I encountered it.


From: fanxi...@travelsky.com [mailto:fanxi...@travelsky.com]
Sent: Tuesday, September 13, 2016 9:22 PM
To: user
Subject: Re: Re: How will storm replay the tuple tree?

Yes, only the failed tuple are replayed, but the whole batch will be held.

So, if the tuple failed forever, the batch will be held forever?

I am just not clear  the tuple itself or the batch which owns the tuple will be 
held in spout.



Josh



From: Ambud Sharma
Date: 2016-09-14 09:10
To: user
Subject: Re: Re: How will storm replay the tuple tree?

No as per the code only individual messages are replayed.

On Sep 13, 2016 6:09 PM, 
"fanxi...@travelsky.com" 
mailto:fanxi...@travelsky.com>> wrote:
Hi:

I'd like to make clear on something about Kafka-spout referring to ack.

For example, kafka-spout fetches offset 5000-6000 from Kafka server, but one 
tuple whose offset is 5101 is failed by a bolt, the whole batch of 5000-6000 
will be remain in kafka-spout until the 5101 tuple will be acked. If the 5101 
tuple can not be acked for a long time, the batch 5000-6000 will remain for a 
long time, and the kafka-spout will stop to fetch data from kafka in these time.

Am I right?



Josh



From: Tech Id
Date: 2016-09-14 06:26
To: user
Subject: Re: How will storm replay the tuple tree?
I agree with this statement about code/architecture but in case of some system 
outages, like one of the end-points (Solr, Couchbase, Elastic-Search etc.) 
being down temporarily, a very large number of other fully-functional and 
healthy systems will receive a large number of duplicate replays (especially in 
heavy throughput topologies).

If you can elaborate a little more on the performance cost of tracking tuples 
or point to a document reflecting the same, that will be of great help.

Best,
T.I.

On Tue, Sep 13, 2016 at 12:26 PM, Hart, James W. 
mailto:jwh...@seic.com>> wrote:
Failures should be very infrequent, if they are not then rethink the code and 
architecture.  The performance cost of tracking tuples in the way that would be 
required to replay at the failure is large, basically that method would slow 
everything way down for very infrequent failures.

From: S G [mailto:sg.online.em...@gmail.com]
Sent: Tuesday, September 13, 2016 3:17 PM
To: user@storm.apache.org
Subject: Re: How will storm replay the tuple tree?

Hi,

I am a little curious to know why we begin at the spout level for case 1.
If we replay at the failing bolt's parent level (BoltA in this case), then it 
should be more performant due to a decrease in duplicate processing (as 
compared to whole tuple tree replays).

If BoltA crashes due to some reason while replaying, only then the Spout should 
receive this as a failure and whole tuple tree should be replayed.

This saving in duplicate processing will be more visible with several layers of 
bolts.

I am sure there is a good reason to replay the whole tuple-tree, and want to 
know the same.

Thanks
SG

On Tue, Sep 13, 2016 at 10:22 AM, P. Taylor Goetz 
mailto:ptgo...@gmail.com>> wrote:
Hi Cheney,

Replays happen at the spout level. So if there is a failure at any point in the 
tuple tree (the tuple tree being the anchored emits, unanchored emits don’t 
count), the original spout tuple will be replayed. So the replayed tuple will 
traverse the topology again, including unanchored points.

If an unanchored tuple fails downstream, it will not trigger a replay.

Hope this helps.

-Taylor


On Sep 13, 2016, at 4:42 AM, Cheney Chen 
mailto:tbcql1...@gmail.com>> wrote:

Hi there,

We're using storm 1.0.1, and I'm checking through 
http://storm.apache.org/releases/1.0.1/Guaranteeing-message-processing.html

Got questions for below two scenarios.
Assume topology: S (spout) --> BoltA --> BoltB
1. S: anchored emit, BoltA: anchored emit
Suppose BoltB processing failed w/ ack, what will the replay be, will it 
execute both BoltA and BoltB or only failed BoltB processing?

2. S: anchored emit, BoltA: unanchored emit
Suppose BoltB processing failed w/ ack, replay will not happen, correct?

--
Regards,
Qili Chen (Cheney)

E-mail: tbcql1...@gmail.com
MP: (+1) 4086217503





Re: storm-kafka-client 1.1.0 tag - Multiple Spouts Eventually Freeze

2016-09-14 Thread Ambud Sharma
I have seen that behavior only when running in local mode of storm and
there is no data flowing in.

This sounds like it might have something to do with zookeeper as in your
offsets in zookeeper are either not updated or the watches are not being
triggered for the spout to consume.

Try using the zkcli and manually ls the consumer group znodes and cat them
to see what data is there.

Thanks,
Ambud

On Sep 12, 2016 12:03 PM, "Zach Schoenberger"  wrote:

> Hi all,
>
> I am working with the storm-kafka-client and am seeing some odd behavior.
> Whenever I setup a topology with the storm-kafka-client with 2 tasks, 2
> workers, and parallelism hint of 2, I see that the spout will eventually
> only poll 0 events from kafka, even when I have verified that there are new
> events flowing in. The number of partitions is 180 for the topic. The kafka
> version I am using is 0.10.1.
>
> Any help/insight would be greatly appreciated.
>
> Thanks,
> Zach
>


Re: Fwd: Serialize error when storm supervisor starting executor

2016-09-14 Thread Ambud Sharma
Can you post the snippet of your pom.xml file especially around where
storm-core is imported?

I suspect you are not excluding dependencies explicitly if there is a
conflict in maven .

What is serialized is your bolt instance so you need either have
serialization objects or mark them transient and instantiate them in the
prepare method.

Thanks,
Ambid

On Sep 12, 2016 11:39 AM, "Jianfei Li"  wrote:

>
> Hi,
>
> I got a InvalidClassException in the supervior after uploading the jars to
> storm using the *"storm jar"* command. The exception is as below:
>
> 2016-09-06 21:49:32.649 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor __acker:[8 8]
> 2016-09-06 21:49:32.661 org.apache.storm.daemon.executor [INFO][main] Loaded 
> executor tasks __acker:[8 8]
> 2016-09-06 21:49:32.672 org.apache.storm.daemon.executor [INFO][main] 
> Timeouts disabled for executor __acker:[8 8]
> 2016-09-06 21:49:32.673 org.apache.storm.daemon.executor [INFO][main] 
> Finished loading executor __acker:[8 8]
> 2016-09-06 21:49:32.688 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor EventDistributionBolt:[2 2]
> 2016-09-06 21:49:32.711 org.apache.storm.daemon.executor [INFO][main] Loaded 
> executor tasks EventDistributionBolt:[2 2]
> 2016-09-06 21:49:32.713 org.apache.storm.daemon.executor [INFO][main] 
> Finished loading executor EventDistributionBolt:[2 2]
> 2016-09-06 21:49:32.728 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor SubscriptionFinderBolt:[6 6]
> 2016-09-06 21:49:32.735 org.apache.storm.daemon.executor [INFO][main] Loaded 
> executor tasks SubscriptionFinderBolt:[6 6]
> 2016-09-06 21:49:32.742 org.apache.storm.daemon.executor [INFO][main] 
> Finished loading executor SubscriptionFinderBolt:[6 6]
> 2016-09-06 21:49:32.753 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor __system:[-1 -1]
> 2016-09-06 21:49:32.755 org.apache.storm.daemon.executor [INFO][main] Loaded 
> executor tasks __system:[-1 -1]
> 2016-09-06 21:49:32.757 org.apache.storm.daemon.executor [INFO][main] 
> Finished loading executor __system:[-1 -1]
> 2016-09-06 21:49:32.768 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor __metricscom.xxx.alp.storm.metric.ServiceLogMetricsConsumer:[10 10]
> 2016-09-06 21:49:32.771 org.apache.storm.daemon.executor [INFO][main] Loaded 
> executor tasks 
> __metricscom..alp.storm.metric.ServiceLogMetricsConsumer:[10 10]
> 2016-09-06 21:49:32.777 org.apache.storm.daemon.executor [INFO][main] 
> Finished loading executor 
> __metricscom..alp.storm.metric.ServiceLogMetricsConsumer:[10 10]
> 2016-09-06 21:49:32.789 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor EventStreamSpout:[4 4]
> 2016-09-06 21:49:32.807 org.apache.storm.messaging.netty.Client 
> [ERROR][client-boss-1] connection attempt 2 to 
> Netty-Client-alp-storm-supervisor-2a-i-f478ab2f..com/:6700 
> failed: java.net.ConnectException: Connection refused: 
> alp-storm-supervisor-2a-i-f478ab2f..com/:6700
> 2016-09-06 
>  
> 21:49:32.936 org.apache.storm.daemon.worker [ERROR][main] Error on 
> initialization of server mk-workerjava.lang.RuntimeException: 
> java.io.InvalidClassException: 
> com.fasterxml.jackson.databind.deser.BasicDeserializerFactory; local class 
> incompatible: stream classdesc serialVersionUID = -1426550576764902820, local 
> class serialVersionUID = 2445376702910286321
> at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:179) 
> ~[storm-core-1.0.x.jar:?]
> at org.apache.storm.utils.Utils.getSetComponentObject(Utils.java:430) 
> ~[storm-core-1.0.x.jar:?]
> at org.apache.storm.daemon.task$get_task_object.invoke(task.clj:74) 
> ~[storm-core-1.0.x.jar:?]
> at 
> org.apache.storm.daemon.task$mk_task_data$fn__5625.invoke(task.clj:177) 
> ~[storm-core-1.0.x.jar:?]
> at org.apache.storm.util$assoc_apply_self.invoke(util.clj:930) 
> ~[storm-core-1.0.x.jar:?]
> at org.apache.storm.daemon.task$mk_task_data.invoke(task.clj:170) 
> ~[storm-core-1.0.x.jar:?]
> at ..
>
> From the exception, I figured out it was because the version of
> jackson-databind classes in the uploaded jar was not the same as the
> jackson-databind the supervisor uses. Then I make them the same and
> re-upload the jars again. The executor at the supervisor node could be
> started without issue this time.
>
> Based on this issue, my questions are:
>
>1. Do I have to make all the shared classes between supervisor and the
>jar I uploaded the same version? This seems impossible, since once I
>uploading new jars containing new version or deploying new version of class
>to the supervisor, the issue above would happen.
>2. What is the serialize/deserialize machanism storm uses during
>starting an executor inside the worker process? What would it
>serialize/deserialize?
>3. If the answer to the

Re: Join stream with historical data

2016-09-14 Thread Ambud Sharma
Yes you can build something for data enrichment as long as your use some
sort of LRU cache on the bolt that is fairly sizable and your event volume
is reasonable to make sure there won't be a bottleneck in the topology.

On Sep 13, 2016 10:43 AM, "Daniela S"  wrote:

> Dear all,
>
> is it possible to add some kind of "historical data" to a data stream in
> Storm? I would like to lookup an entry in a database for every tuple when
> it arrives in Storm and to join the values of this entry with the tuple to
> proceed with processing.
>
> Thank you in advance.
>
> Best regards,
> Daniela
>


Re: How will storm replay the tuple tree?

2016-09-14 Thread Ambud Sharma
Two things here extending what Ravi talked about:

1. You fail tuples either explicitly or they timeout as an indicator of a
recoverable issue in the topology.

If the error is not recoverable don't fail the tuple, ack it and forward
the error to another bolt so you can record it somewhere for further
investigation like kafka (we have a topic in kafka for this)

2. Real-time processing means you have to worry about latencies at the nano
second level at times, this means fail fast strategy must be used. Point to
point failure at the granularity of a single tuple can be implemented using
transactions with size of 1. This will slow down the topology
substantially. You can try an implementation yourself and see.

The XOR based tuple tree is a genius innovation from Nathan Marz to do
tuple tracking very very fast while using predictable memory. So regardless
of however many hops your tuple has to go through Storm uses 20 bytes to
track it down.



Now about exactly once processing. There is no such this as exactly once
processing unless using transactions with batch size of 1. (Including
trident)

What topology developers should focus on is idempotent processing!

What does that mean? Idempotent processing means if your tuple was to
replay the result would not change. So if you are using trident
micro-batching or you wrote your own micro-batching in Storm the net result
is, in case of failures your tuples will replay but your are okay doing
that since your net result will be the same.

With trident it will not process the next batch until the current one is
processed. Which means the entire batch has to be handled via rollback
transactions (as in you flush to the db at the end of the batch) or better
write to db in an idempotent manner where each tuple has an id such that if
you wrote it again it will just rewrite the same info.

Most modern data stores have the concept of a key which can be used e.g.
elastic document id,  hbase row key, MySQL primary key etc.

Now how to get UUID of the tuple?
1. Handle in your application logic if you already know what is a unique
event
2. Worry from Kafka onwards (we do this) use partition id + offset + event
timestamp (inside the event payload) as the UUID
3. MD5 the payload of the event (have a risk of collision here depending on
your event volume and application logic)

For things like unique counting you can use in-memory approach like we did
(Hendrix) or use something like Redis with structures like set and
hhperloglog.

Thanks,
Ambud

On Sep 14, 2016 1:38 AM, "Cheney Chen"  wrote:

> Thank you guys for the discussion.
>
> What if I want exact-once processing for all nodes (bolts), even when
> failure happens, will Trident be the one?
>
> On Wed, Sep 14, 2016 at 3:49 PM, Ravi Sharma  wrote:
>
>> Hi T.I.
>> Few things why Spout is responsible for replay rather then Various Bolts.
>>
>> 1. ack and fail messages carry only message ID, Usually your spouts
>> generate messaged Id and knows what tuple/message is linked to it(via
>> source i.e. jms  etc). If ack or fail happens then Spout can do various
>> things like on ack delete from queue, on fail put in some dead letter
>> queue. intermediate Bolt Wont know what message it sent, unless you
>> implement something of your own. Technically you can put Delete message
>> from JMS in bolts but then your whole topology knows from where you are
>> getting data, what if tommorow you start processing data from JMS, Http
>> rest service, Database and file system etc.
>>
>> 2. BoltB fails, it tells BoltA, BoltA retry 3 times, it fails 3 times,
>> now what BoltA should do,? Send it to another bolt(say BoltPreA exists
>> between him and spout) or send it to Spout.?
>> If it sends to BoltPreA that means BoltPreA will retry 3
>> times(just using 3 number consider as N), that means for each try to
>> BoltPreA, BoltA will retry again 3 times, so total 9 retries.(basically
>> total retries will be based on Total bolt from Spout to Failure Bolt TB and
>> total Retries TR, it will be like TR + Power(TR,2) . + Power(TR,TB)
>> If you send back from failure from BoltA to Spout then we can
>> argue why not send it to Spout from BoltB, as a framework i shouldnt be
>> looking into if BoltB is really costly or BoltA is really costly.
>>
>> 3. Also failure scenario are suppose to be really really low, and if your
>> database is down(means 100% tuple will fail), then performance wont be your
>> only concern. your concern will be to make sure database comes up and
>> reprocess all failed tuple.
>>
>> 4. Also you will have to take care of retry logic in every Bolt.
>> Currently its only at one place.
>>
>>
>>
>> *There is one thing i am looking forward from Storm is to inform Spout
>> about what kind of failure it was*. i.e. if it was ConnectionTimeout or
>> ReadTimeout etc, that means if i retry it may pass. But say it was null
>> pointer exception(java world) , i know the data which is being expected is
>> not t

Re: [SURVEY] Which external modules (connector) do you use actively?

2016-09-14 Thread Alberto São Marcos
Storm JMS

On Tue, Sep 13, 2016 at 5:30 PM, Kevin Conaway 
wrote:

> We use the Kafka spout on Storm 0.10.0.
>
> We also make use of the Graphite metrics consumer library from Verisign.
>
> On Tue, Sep 13, 2016 at 4:57 AM, Jungtaek Lim  wrote:
>
>> Hi users,
>>
>> We're making some efforts to Storm SQL to help Storm community easier to
>> build topology and run.
>> Many features for Storm SQL are addressed or in progress. Along with
>> efforts for the improvements, we also would like to expand support of
>> data sources. (For now Storm SQL only supports Kafka.)
>>
>> In order to rank the priority to support, I'd like to see which external
>> modules (connector) the user community are actively using.
>>
>> You can also enumerate not-released-yet modules: Druid, Kinesis, OpenTSDB
>> as they're also candidates.
>>
>> Thanks in advance!
>> Jungtaek Lim (HeartSaVioR)
>>
>> ps. If you're interested to see the progress of Storm SQL, you can have a
>> look at epic issues below. Contributions would be more appreciated!
>>
>> - Storm SQL Phase II 
>> - Storm SQL Phase III  
>> (Adding
>> data sources is placed here.)
>>
>
>
>
> --
> Kevin Conaway
> http://www.linkedin.com/pub/kevin-conaway/7/107/580/
> https://github.com/kevinconaway
>


Re: How will storm replay the tuple tree?

2016-09-14 Thread Cheney Chen
Thank you guys for the discussion.

What if I want exact-once processing for all nodes (bolts), even when
failure happens, will Trident be the one?

On Wed, Sep 14, 2016 at 3:49 PM, Ravi Sharma  wrote:

> Hi T.I.
> Few things why Spout is responsible for replay rather then Various Bolts.
>
> 1. ack and fail messages carry only message ID, Usually your spouts
> generate messaged Id and knows what tuple/message is linked to it(via
> source i.e. jms  etc). If ack or fail happens then Spout can do various
> things like on ack delete from queue, on fail put in some dead letter
> queue. intermediate Bolt Wont know what message it sent, unless you
> implement something of your own. Technically you can put Delete message
> from JMS in bolts but then your whole topology knows from where you are
> getting data, what if tommorow you start processing data from JMS, Http
> rest service, Database and file system etc.
>
> 2. BoltB fails, it tells BoltA, BoltA retry 3 times, it fails 3 times, now
> what BoltA should do,? Send it to another bolt(say BoltPreA exists between
> him and spout) or send it to Spout.?
> If it sends to BoltPreA that means BoltPreA will retry 3
> times(just using 3 number consider as N), that means for each try to
> BoltPreA, BoltA will retry again 3 times, so total 9 retries.(basically
> total retries will be based on Total bolt from Spout to Failure Bolt TB and
> total Retries TR, it will be like TR + Power(TR,2) . + Power(TR,TB)
> If you send back from failure from BoltA to Spout then we can
> argue why not send it to Spout from BoltB, as a framework i shouldnt be
> looking into if BoltB is really costly or BoltA is really costly.
>
> 3. Also failure scenario are suppose to be really really low, and if your
> database is down(means 100% tuple will fail), then performance wont be your
> only concern. your concern will be to make sure database comes up and
> reprocess all failed tuple.
>
> 4. Also you will have to take care of retry logic in every Bolt. Currently
> its only at one place.
>
>
>
> *There is one thing i am looking forward from Storm is to inform Spout
> about what kind of failure it was*. i.e. if it was ConnectionTimeout or
> ReadTimeout etc, that means if i retry it may pass. But say it was null
> pointer exception(java world) , i know the data which is being expected is
> not there and my code is not handling that scenario, so either i will have
> to change code or ask data provider to send that field, but retry wont help
> me.
>
> Currently only way to do is use a outside datastore like Redis, whichever
> Bolt you fail add a key with mesageId and Exception/error detail in redis
> before calling fail. and then let Spout read that data from redis with
> messageId received in onFail call and then spout can decide if i want to
> retry or not. I would usually Create two wrappers Retry-able Exception and
> *non* Retry-able Exception, so each bolt can inform whether retry can
> help or not. Its upto you where you put this decision making logic.
>
>
>
> Thanks
> Ravi.
>
>
>
>
>
>
> On Wed, Sep 14, 2016 at 6:43 AM, Tech Id  wrote:
>
>> Thanks Ambud,
>>
>> I did read some very good things about acking mechanism in Storm but I am
>> not sure it explains why point to point checking is expensive.
>>
>> Consider the example: Spout--> BoltA--->BoltB.
>>
>> If BoltB fails, it will report failure to the acker.
>> If the acker can ask the Spout to replay, then why can't the acker ask
>> the parent of BoltB to replay at this point?
>> I don't think keeping parent of a bolt could be expensive.
>>
>>
>> On a related note, I am a little confused about a statement "When a new
>> tupletree is born, the spout sends the XORed edge-ids of each tuple
>> recipient, which the acker records in its pending ledger" in
>> Acking-framework-implementation.html
>> 
>> .
>> How does the spout know before hand which bolts would receive the tuple?
>> Bolts forward tuples to other bolts based on groupings and dynamically
>> generated fields. How does spout know what fields will be generated and
>> which bolts will receive the tuples? If it does not know that, then how
>> does it send the XOR of each tuple recipient in a tuple's path because each
>> tuple's path will be different (I think, not sure though).
>>
>>
>> Thx,
>> T.I.
>>
>>
>> On Tue, Sep 13, 2016 at 6:37 PM, Ambud Sharma 
>> wrote:
>>
>>> Here is a post on it https://bryantsai.com/fault-to
>>> lerant-message-processing-in-storm/.
>>>
>>> Point to point tracking is expensive unless you are using transactions.
>>> Flume does point to point transfers using transactions.
>>>
>>> On Sep 13, 2016 3:27 PM, "Tech Id"  wrote:
>>>
 I agree with this statement about code/architecture but in case of some
 system outages, like one of the end-points (Solr, Couchbase, Elastic-Search
 etc.) being down temporarily, a very large number of other fully-functional
 an

Re: How will storm replay the tuple tree?

2016-09-14 Thread Ravi Sharma
Hi T.I.
Few things why Spout is responsible for replay rather then Various Bolts.

1. ack and fail messages carry only message ID, Usually your spouts
generate messaged Id and knows what tuple/message is linked to it(via
source i.e. jms  etc). If ack or fail happens then Spout can do various
things like on ack delete from queue, on fail put in some dead letter
queue. intermediate Bolt Wont know what message it sent, unless you
implement something of your own. Technically you can put Delete message
from JMS in bolts but then your whole topology knows from where you are
getting data, what if tommorow you start processing data from JMS, Http
rest service, Database and file system etc.

2. BoltB fails, it tells BoltA, BoltA retry 3 times, it fails 3 times, now
what BoltA should do,? Send it to another bolt(say BoltPreA exists between
him and spout) or send it to Spout.?
If it sends to BoltPreA that means BoltPreA will retry 3 times(just
using 3 number consider as N), that means for each try to BoltPreA, BoltA
will retry again 3 times, so total 9 retries.(basically total retries will
be based on Total bolt from Spout to Failure Bolt TB and total Retries TR,
it will be like TR + Power(TR,2) . + Power(TR,TB)
If you send back from failure from BoltA to Spout then we can argue
why not send it to Spout from BoltB, as a framework i shouldnt be looking
into if BoltB is really costly or BoltA is really costly.

3. Also failure scenario are suppose to be really really low, and if your
database is down(means 100% tuple will fail), then performance wont be your
only concern. your concern will be to make sure database comes up and
reprocess all failed tuple.

4. Also you will have to take care of retry logic in every Bolt. Currently
its only at one place.



*There is one thing i am looking forward from Storm is to inform Spout
about what kind of failure it was*. i.e. if it was ConnectionTimeout or
ReadTimeout etc, that means if i retry it may pass. But say it was null
pointer exception(java world) , i know the data which is being expected is
not there and my code is not handling that scenario, so either i will have
to change code or ask data provider to send that field, but retry wont help
me.

Currently only way to do is use a outside datastore like Redis, whichever
Bolt you fail add a key with mesageId and Exception/error detail in redis
before calling fail. and then let Spout read that data from redis with
messageId received in onFail call and then spout can decide if i want to
retry or not. I would usually Create two wrappers Retry-able Exception and
*non* Retry-able Exception, so each bolt can inform whether retry can help
or not. Its upto you where you put this decision making logic.



Thanks
Ravi.






On Wed, Sep 14, 2016 at 6:43 AM, Tech Id  wrote:

> Thanks Ambud,
>
> I did read some very good things about acking mechanism in Storm but I am
> not sure it explains why point to point checking is expensive.
>
> Consider the example: Spout--> BoltA--->BoltB.
>
> If BoltB fails, it will report failure to the acker.
> If the acker can ask the Spout to replay, then why can't the acker ask the
> parent of BoltB to replay at this point?
> I don't think keeping parent of a bolt could be expensive.
>
>
> On a related note, I am a little confused about a statement "When a new
> tupletree is born, the spout sends the XORed edge-ids of each tuple
> recipient, which the acker records in its pending ledger" in
> Acking-framework-implementation.html
> 
> .
> How does the spout know before hand which bolts would receive the tuple?
> Bolts forward tuples to other bolts based on groupings and dynamically
> generated fields. How does spout know what fields will be generated and
> which bolts will receive the tuples? If it does not know that, then how
> does it send the XOR of each tuple recipient in a tuple's path because each
> tuple's path will be different (I think, not sure though).
>
>
> Thx,
> T.I.
>
>
> On Tue, Sep 13, 2016 at 6:37 PM, Ambud Sharma 
> wrote:
>
>> Here is a post on it https://bryantsai.com/fault-to
>> lerant-message-processing-in-storm/.
>>
>> Point to point tracking is expensive unless you are using transactions.
>> Flume does point to point transfers using transactions.
>>
>> On Sep 13, 2016 3:27 PM, "Tech Id"  wrote:
>>
>>> I agree with this statement about code/architecture but in case of some
>>> system outages, like one of the end-points (Solr, Couchbase, Elastic-Search
>>> etc.) being down temporarily, a very large number of other fully-functional
>>> and healthy systems will receive a large number of duplicate replays
>>> (especially in heavy throughput topologies).
>>>
>>> If you can elaborate a little more on the performance cost of tracking
>>> tuples or point to a document reflecting the same, that will be of great
>>> help.
>>>
>>> Best,
>>> T.I.
>>>
>>> On Tue, Sep 13, 2016 at 12:26 PM, Hart, Jame