Re: Syncing multiple streams to compute final result from a bolt

2016-09-20 Thread Harsh Choudhary
My use case is that I have a json which contains an array. I need to split
that array into multiple jsons and do some computations on them. After
that, results from each json has to be used in further calculation
altogether and come up with the final result.

*Cheers!*

Harsh Choudhary / Software Engineer

Blog / express.harshti.me

[image: Facebook] <https://facebook.com/shry.harsh> [image: Twitter]
<https://twitter.com/har_ssh> [image: Google Plus]
<https://plus.google.com/107567038912927268680>
<https://in.linkedin.com/in/choudharyharsh> [image: Linkedin]
<https://in.linkedin.com/in/choudharyharsh> [image: Instagram]
<https://instagram.com/harsh.choudhary>
<https://www.pinterest.com/shryharsh/>[image: 500px]
<https://500px.com/harshchoudhary> [image: github]
<https://github.com/shry15harsh>

On Tue, Sep 20, 2016 at 9:09 PM, Ambud Sharma <asharma52...@gmail.com>
wrote:

> What's your use case?
>
> The complexities can be manage d as long as your tuple branching is
> reasonable I.e. 1 tuple creates several other tuples and you need to sync
> results between them.
>
> On Sep 20, 2016 8:19 AM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:
>
>> You're right. For that I have to manage the queue and all those
>> complexities of timeout. If Storm is not the right place to do this then
>> what else?
>>
>>
>>
>> On Tue, Sep 20, 2016 at 8:25 PM, Ambud Sharma <asharma52...@gmail.com>
>> wrote:
>>
>>> The correct way is to perform time window aggregation using bucketing.
>>>
>>> Use the timestamp on your event computed from.various stages and send it
>>> to a single bolt where the aggregation happens. You only emit from this
>>> bolt once you receive results from both parts.
>>>
>>> It's like creating a barrier or the join phase of a fork join pool.
>>>
>>> That said the more important question is is Storm the right place do to
>>> this? When you perform time window aggregation you are susceptible to tuple
>>> timeouts and have to also deal with making sure your aggregation is
>>> idempotent.
>>>
>>> On Sep 20, 2016 7:49 AM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:
>>>
>>>> But how would that solve the syncing problem?
>>>>
>>>>
>>>>
>>>> On Tue, Sep 20, 2016 at 8:12 PM, Alberto São Marcos <
>>>> alberto@gmail.com> wrote:
>>>>
>>>>> I would dump the *Bolt-A* results in a shared-data-store/queue and
>>>>> have a separate workflow with another spout and Bolt-B draining from there
>>>>>
>>>>> On Tue, Sep 20, 2016 at 9:20 AM, Harsh Choudhary <shry.ha...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I am thinking of doing the following.
>>>>>>
>>>>>> Spout subscribed to Kafka and get JSONs. Spout emits the JSONs as
>>>>>> individual tuples.
>>>>>>
>>>>>> Bolt-A has subscribed to the spout. Bolt-A creates multiple JSONs
>>>>>> from a json and emits them as multiple streams.
>>>>>>
>>>>>> Bolt-B receives these streams and do the computation on them.
>>>>>>
>>>>>> I need to make a cumulative result from all the multiple JSONs (which
>>>>>> are emerged from a single JSON) in a Bolt. But a bolt static instance
>>>>>> variable is only shared between tasks per worker. How do achieve this
>>>>>> syncing process.
>>>>>>
>>>>>>   --->
>>>>>> Spout ---> Bolt-A   --->   Bolt-B  ---> Final result
>>>>>>   --->
>>>>>>
>>>>>> The final result is per JSON which was read from Kafka.
>>>>>>
>>>>>> Or is there any other way to achieve this better?
>>>>>>
>>>>>
>>>>>
>>>>
>>


Re: Syncing multiple streams to compute final result from a bolt

2016-09-20 Thread Harsh Choudhary
You're right. For that I have to manage the queue and all those
complexities of timeout. If Storm is not the right place to do this then
what else?



On Tue, Sep 20, 2016 at 8:25 PM, Ambud Sharma <asharma52...@gmail.com>
wrote:

> The correct way is to perform time window aggregation using bucketing.
>
> Use the timestamp on your event computed from.various stages and send it
> to a single bolt where the aggregation happens. You only emit from this
> bolt once you receive results from both parts.
>
> It's like creating a barrier or the join phase of a fork join pool.
>
> That said the more important question is is Storm the right place do to
> this? When you perform time window aggregation you are susceptible to tuple
> timeouts and have to also deal with making sure your aggregation is
> idempotent.
>
> On Sep 20, 2016 7:49 AM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:
>
>> But how would that solve the syncing problem?
>>
>>
>>
>> On Tue, Sep 20, 2016 at 8:12 PM, Alberto São Marcos <
>> alberto@gmail.com> wrote:
>>
>>> I would dump the *Bolt-A* results in a shared-data-store/queue and have
>>> a separate workflow with another spout and Bolt-B draining from there
>>>
>>> On Tue, Sep 20, 2016 at 9:20 AM, Harsh Choudhary <shry.ha...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> I am thinking of doing the following.
>>>>
>>>> Spout subscribed to Kafka and get JSONs. Spout emits the JSONs as
>>>> individual tuples.
>>>>
>>>> Bolt-A has subscribed to the spout. Bolt-A creates multiple JSONs from
>>>> a json and emits them as multiple streams.
>>>>
>>>> Bolt-B receives these streams and do the computation on them.
>>>>
>>>> I need to make a cumulative result from all the multiple JSONs (which
>>>> are emerged from a single JSON) in a Bolt. But a bolt static instance
>>>> variable is only shared between tasks per worker. How do achieve this
>>>> syncing process.
>>>>
>>>>   --->
>>>> Spout ---> Bolt-A   --->   Bolt-B  ---> Final result
>>>>   --->
>>>>
>>>> The final result is per JSON which was read from Kafka.
>>>>
>>>> Or is there any other way to achieve this better?
>>>>
>>>
>>>
>>


Syncing multiple streams to compute final result from a bolt

2016-09-20 Thread Harsh Choudhary
Hi

I am thinking of doing the following.

Spout subscribed to Kafka and get JSONs. Spout emits the JSONs as
individual tuples.

Bolt-A has subscribed to the spout. Bolt-A creates multiple JSONs from a
json and emits them as multiple streams.

Bolt-B receives these streams and do the computation on them.

I need to make a cumulative result from all the multiple JSONs (which are
emerged from a single JSON) in a Bolt. But a bolt static instance variable
is only shared between tasks per worker. How do achieve this syncing
process.

  --->
Spout ---> Bolt-A   --->   Bolt-B  ---> Final result
  --->

The final result is per JSON which was read from Kafka.

Or is there any other way to achieve this better?


Re: Syncing multiple streams to compute final result from a bolt

2016-09-23 Thread Harsh Choudhary
Thanks for all the help. :)



On Wed, Sep 21, 2016 at 11:56 AM, Harsh Choudhary <shry.ha...@gmail.com>
wrote:

> It is real-time. I get streaming JSONs from Kafka.
>
>
>
>
> On Wed, Sep 21, 2016 at 4:15 AM, Ambud Sharma <asharma52...@gmail.com>
> wrote:
>
>> Is this real-time or batch?
>>
>> If batch this is perfect for MapReduce or Spark.
>>
>> If real-time then you should use Spark or Storm Trident.
>>
>> On Sep 20, 2016 9:39 AM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:
>>
>>> My use case is that I have a json which contains an array. I need to
>>> split that array into multiple jsons and do some computations on them.
>>> After that, results from each json has to be used in further calculation
>>> altogether and come up with the final result.
>>>
>>> *Cheers!*
>>>
>>> Harsh Choudhary / Software Engineer
>>>
>>> Blog / express.harshti.me
>>>
>>> [image: Facebook] <https://facebook.com/shry.harsh> [image: Twitter]
>>> <https://twitter.com/har_ssh> [image: Google Plus]
>>> <https://plus.google.com/107567038912927268680>
>>> <https://in.linkedin.com/in/choudharyharsh> [image: Linkedin]
>>> <https://in.linkedin.com/in/choudharyharsh> [image: Instagram]
>>> <https://instagram.com/harsh.choudhary>
>>> <https://www.pinterest.com/shryharsh/>[image: 500px]
>>> <https://500px.com/harshchoudhary> [image: github]
>>> <https://github.com/shry15harsh>
>>>
>>> On Tue, Sep 20, 2016 at 9:09 PM, Ambud Sharma <asharma52...@gmail.com>
>>> wrote:
>>>
>>>> What's your use case?
>>>>
>>>> The complexities can be manage d as long as your tuple branching is
>>>> reasonable I.e. 1 tuple creates several other tuples and you need to sync
>>>> results between them.
>>>>
>>>> On Sep 20, 2016 8:19 AM, "Harsh Choudhary" <shry.ha...@gmail.com>
>>>> wrote:
>>>>
>>>>> You're right. For that I have to manage the queue and all those
>>>>> complexities of timeout. If Storm is not the right place to do this then
>>>>> what else?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 20, 2016 at 8:25 PM, Ambud Sharma <asharma52...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> The correct way is to perform time window aggregation using bucketing.
>>>>>>
>>>>>> Use the timestamp on your event computed from.various stages and send
>>>>>> it to a single bolt where the aggregation happens. You only emit from 
>>>>>> this
>>>>>> bolt once you receive results from both parts.
>>>>>>
>>>>>> It's like creating a barrier or the join phase of a fork join pool.
>>>>>>
>>>>>> That said the more important question is is Storm the right place do
>>>>>> to this? When you perform time window aggregation you are susceptible to
>>>>>> tuple timeouts and have to also deal with making sure your aggregation is
>>>>>> idempotent.
>>>>>>
>>>>>> On Sep 20, 2016 7:49 AM, "Harsh Choudhary" <shry.ha...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> But how would that solve the syncing problem?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 20, 2016 at 8:12 PM, Alberto São Marcos <
>>>>>>> alberto@gmail.com> wrote:
>>>>>>>
>>>>>>>> I would dump the *Bolt-A* results in a shared-data-store/queue and
>>>>>>>> have a separate workflow with another spout and Bolt-B draining from 
>>>>>>>> there
>>>>>>>>
>>>>>>>> On Tue, Sep 20, 2016 at 9:20 AM, Harsh Choudhary <
>>>>>>>> shry.ha...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>
>>>>>>>>> I am thinking of doing the following.
>>>>>>>>>
>>>>>>>>> Spout subscribed to Kafka and get JSONs. Spout emits the JSONs as
>>>>>>>>> individual tuples.
>>>>>>>>>
>>>>>>>>> Bolt-A has subscribed to the spout. Bolt-A creates multiple JSONs
>>>>>>>>> from a json and emits them as multiple streams.
>>>>>>>>>
>>>>>>>>> Bolt-B receives these streams and do the computation on them.
>>>>>>>>>
>>>>>>>>> I need to make a cumulative result from all the multiple JSONs
>>>>>>>>> (which are emerged from a single JSON) in a Bolt. But a bolt static
>>>>>>>>> instance variable is only shared between tasks per worker. How do 
>>>>>>>>> achieve
>>>>>>>>> this syncing process.
>>>>>>>>>
>>>>>>>>>   --->
>>>>>>>>> Spout ---> Bolt-A   --->   Bolt-B  ---> Final result
>>>>>>>>>   --->
>>>>>>>>>
>>>>>>>>> The final result is per JSON which was read from Kafka.
>>>>>>>>>
>>>>>>>>> Or is there any other way to achieve this better?
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>


Re: Syncing multiple streams to compute final result from a bolt

2016-09-20 Thread Harsh Choudhary
But how would that solve the syncing problem?



On Tue, Sep 20, 2016 at 8:12 PM, Alberto São Marcos <alberto@gmail.com>
wrote:

> I would dump the *Bolt-A* results in a shared-data-store/queue and have a
> separate workflow with another spout and Bolt-B draining from there
>
> On Tue, Sep 20, 2016 at 9:20 AM, Harsh Choudhary <shry.ha...@gmail.com>
> wrote:
>
>> Hi
>>
>> I am thinking of doing the following.
>>
>> Spout subscribed to Kafka and get JSONs. Spout emits the JSONs as
>> individual tuples.
>>
>> Bolt-A has subscribed to the spout. Bolt-A creates multiple JSONs from a
>> json and emits them as multiple streams.
>>
>> Bolt-B receives these streams and do the computation on them.
>>
>> I need to make a cumulative result from all the multiple JSONs (which are
>> emerged from a single JSON) in a Bolt. But a bolt static instance variable
>> is only shared between tasks per worker. How do achieve this syncing
>> process.
>>
>>   --->
>> Spout ---> Bolt-A   --->   Bolt-B  ---> Final result
>>   --->
>>
>> The final result is per JSON which was read from Kafka.
>>
>> Or is there any other way to achieve this better?
>>
>
>


Re: Syncing multiple streams to compute final result from a bolt

2016-09-21 Thread Harsh Choudhary
It is real-time. I get streaming JSONs from Kafka.



On Wed, Sep 21, 2016 at 4:15 AM, Ambud Sharma <asharma52...@gmail.com>
wrote:

> Is this real-time or batch?
>
> If batch this is perfect for MapReduce or Spark.
>
> If real-time then you should use Spark or Storm Trident.
>
> On Sep 20, 2016 9:39 AM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:
>
>> My use case is that I have a json which contains an array. I need to
>> split that array into multiple jsons and do some computations on them.
>> After that, results from each json has to be used in further calculation
>> altogether and come up with the final result.
>>
>> *Cheers!*
>>
>> Harsh Choudhary / Software Engineer
>>
>> Blog / express.harshti.me
>>
>> [image: Facebook] <https://facebook.com/shry.harsh> [image: Twitter]
>> <https://twitter.com/har_ssh> [image: Google Plus]
>> <https://plus.google.com/107567038912927268680>
>> <https://in.linkedin.com/in/choudharyharsh> [image: Linkedin]
>> <https://in.linkedin.com/in/choudharyharsh> [image: Instagram]
>> <https://instagram.com/harsh.choudhary>
>> <https://www.pinterest.com/shryharsh/>[image: 500px]
>> <https://500px.com/harshchoudhary> [image: github]
>> <https://github.com/shry15harsh>
>>
>> On Tue, Sep 20, 2016 at 9:09 PM, Ambud Sharma <asharma52...@gmail.com>
>> wrote:
>>
>>> What's your use case?
>>>
>>> The complexities can be manage d as long as your tuple branching is
>>> reasonable I.e. 1 tuple creates several other tuples and you need to sync
>>> results between them.
>>>
>>> On Sep 20, 2016 8:19 AM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:
>>>
>>>> You're right. For that I have to manage the queue and all those
>>>> complexities of timeout. If Storm is not the right place to do this then
>>>> what else?
>>>>
>>>>
>>>>
>>>> On Tue, Sep 20, 2016 at 8:25 PM, Ambud Sharma <asharma52...@gmail.com>
>>>> wrote:
>>>>
>>>>> The correct way is to perform time window aggregation using bucketing.
>>>>>
>>>>> Use the timestamp on your event computed from.various stages and send
>>>>> it to a single bolt where the aggregation happens. You only emit from this
>>>>> bolt once you receive results from both parts.
>>>>>
>>>>> It's like creating a barrier or the join phase of a fork join pool.
>>>>>
>>>>> That said the more important question is is Storm the right place do
>>>>> to this? When you perform time window aggregation you are susceptible to
>>>>> tuple timeouts and have to also deal with making sure your aggregation is
>>>>> idempotent.
>>>>>
>>>>> On Sep 20, 2016 7:49 AM, "Harsh Choudhary" <shry.ha...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> But how would that solve the syncing problem?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 20, 2016 at 8:12 PM, Alberto São Marcos <
>>>>>> alberto@gmail.com> wrote:
>>>>>>
>>>>>>> I would dump the *Bolt-A* results in a shared-data-store/queue and
>>>>>>> have a separate workflow with another spout and Bolt-B draining from 
>>>>>>> there
>>>>>>>
>>>>>>> On Tue, Sep 20, 2016 at 9:20 AM, Harsh Choudhary <
>>>>>>> shry.ha...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> I am thinking of doing the following.
>>>>>>>>
>>>>>>>> Spout subscribed to Kafka and get JSONs. Spout emits the JSONs as
>>>>>>>> individual tuples.
>>>>>>>>
>>>>>>>> Bolt-A has subscribed to the spout. Bolt-A creates multiple JSONs
>>>>>>>> from a json and emits them as multiple streams.
>>>>>>>>
>>>>>>>> Bolt-B receives these streams and do the computation on them.
>>>>>>>>
>>>>>>>> I need to make a cumulative result from all the multiple JSONs
>>>>>>>> (which are emerged from a single JSON) in a Bolt. But a bolt static
>>>>>>>> instance variable is only shared between tasks per worker. How do 
>>>>>>>> achieve
>>>>>>>> this syncing process.
>>>>>>>>
>>>>>>>>   --->
>>>>>>>> Spout ---> Bolt-A   --->   Bolt-B  ---> Final result
>>>>>>>>   --->
>>>>>>>>
>>>>>>>> The final result is per JSON which was read from Kafka.
>>>>>>>>
>>>>>>>> Or is there any other way to achieve this better?
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>


Storm spout sends next tuple before completion of current

2016-11-06 Thread Harsh Choudhary
Hi

I have a bolt (SPLITTER) which receives data from kafkaspout. The SPLITTER
bolt splits the data and emits them into multiple streams to another bolt
(WRANGLER). The Wrangler takes some time for processing some of the data.
So before it can emit the data to another stream, the spout sends the next
data to SPLITTER.

I have configured TOPOLOGY_MAX_SPOUT_PENDING to be 1
and TOPOLOGY_MESSAGE_TIMEOUT_SECS to a very high number. I do not want it
to send me next tuple before I am done with processing of current. I am
acking tuple, in each bolt, in the last.

It is running fine when the bolt does not take much time for processing the
data. I tested the former case by putting sleep of 10 seconds.


Re: NullPointerException on startup

2016-11-18 Thread Harsh Choudhary
Hi
This happens when there is some code in bolt or spout which throws Null
Pointer Exception. I suggest you to use Debugger in your IDE, to find out
where is this happening. You can try making a Local Cluster and runs it in
IDE to figure it out easily.

It never happens because of Storm, so do not raise ticket for now.



On Sat, Nov 19, 2016 at 12:40 AM, Cuneo, Nicholas 
wrote:

> Hello,
>
>
>
> We are working with storm 1.0.2 and using Kafka client to subscribe to
> kafka topics to retrieve data.  Randomly when our topologies are starting
> up we receive a null pointer exception which is killing the topology.  We
> can’t seem to identify why this happens, or what measures we can take to
> prevent it.  I’m considering raising a ticket with storm to check for null
> to prevent the topology from crashing.
>
>
>
> 2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
> [storm-core-1.0.2.jar:1.0.2]
>
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>
> Caused by: java.lang.NullPointerException
>
> at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316)
> ~[stormjar.jar:?]
>
> at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.daemon.executor$fn__7990$tuple_
> action_fn__7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> ... 7 more
>
> 2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR]
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
> [storm-core-1.0.2.jar:1.0.2]
>
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>
> Caused by: java.lang.NullPointerException
>
> at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316)
> ~[stormjar.jar:?]
>
> at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.daemon.executor$fn__7990$tuple_
> action_fn__7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> ... 7 more
>
> 2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
>
> java.lang.RuntimeException: ("Worker died")
>
> at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
> [storm-core-1.0.2.jar:1.0.2]
>
> at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>
> at org.apache.storm.daemon.worker$fn__8663$fn__8664.invoke(worker.clj:765)
> [storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.daemon.executor$mk_executor_data$fn__
> 7875$fn__7876.invoke(executor.clj:274) [storm-core-1.0.2.jar:1.0.2]
>
> at 

Re: NullPointerException on startup

2016-11-18 Thread Harsh Choudhary
I think, you may want to start kafka and then give some time for its
initialisation and then submit topology.

On Sat, Nov 19, 2016 at 12:55 AM, Cuneo, Nicholas <ncu...@tycoint.com>
wrote:

> The cause of the null pointer exception is IN kafka spout during an ACK
>
>
>
> My guess is there’s a race condition where it tries to retrieve the topic
> partition from the map but it hasn’t been loaded yet.  Which results in a
> NULL entry during the GET.  When the topology starts up it will immediately
> receive messages but could be slow to startup due to rebalancing of the
> kafka offsets or other tasks it does when it subscribes.
>
>
>
> Like I said, this happens occasionally during startup but not reliably and
> has nothing to do with my code other than I’m probably acking the received
> message faster than the kafka spout can finish initialization.
>
>
>
> Thanks,
>
> Nick
>
>
>
> *From:* Harsh Choudhary [mailto:shry.ha...@gmail.com]
> *Sent:* Friday, November 18, 2016 11:18 AM
> *To:* user@storm.apache.org
> *Subject:* Re: NullPointerException on startup
>
>
>
> Hi
>
> This happens when there is some code in bolt or spout which throws Null
> Pointer Exception. I suggest you to use Debugger in your IDE, to find out
> where is this happening. You can try making a Local Cluster and runs it in
> IDE to figure it out easily.
>
> It never happens because of Storm, so do not raise ticket for now.
>
>
>
>
>
>
> On Sat, Nov 19, 2016 at 12:40 AM, Cuneo, Nicholas <ncu...@tycoint.com>
> wrote:
>
> Hello,
>
>
>
> We are working with storm 1.0.2 and using Kafka client to subscribe to
> kafka topics to retrieve data.  Randomly when our topologies are starting
> up we receive a null pointer exception which is killing the topology.  We
> can’t seem to identify why this happens, or what measures we can take to
> prevent it.  I’m considering raising a ticket with storm to check for null
> to prevent the topology from crashing.
>
>
>
> 2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
> [storm-core-1.0.2.jar:1.0.2]
>
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>
> Caused by: java.lang.NullPointerException
>
> at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316)
> ~[stormjar.jar:?]
>
> at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.daemon.executor$fn__7990$tuple_
> action_fn__7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> ... 7 more
>
> 2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR]
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.util$async_loop$fn__624.invo

Re: NullPointerException on startup

2016-11-18 Thread Harsh Choudhary
If this is the case then it should not happen. As you suspect that 'there’s
a race condition where it tries to retrieve the topic partition from the
map but it hasn’t been loaded yet' then if Kafka has been up for a long
time, then there should not be such problem of loading. Have you modified
the Kafka spout?

*Cheers!*

Harsh Choudhary

On Sat, Nov 19, 2016 at 1:24 AM, Cuneo, Nicholas <ncu...@tycoint.com> wrote:

> The spout is initialized during topology submission, so how would you
> delay that?  Kafka is already running for a long period of time.
>
> Thanks,
> Nick
>
>
> ---
> Sent from Boxer | http://getboxer.com <http://bxr.io/PBID1>
>
>
> On November 18, 2016 at 11:51:29 AM PST, Harsh Choudhary <
> shry.ha...@gmail.com> wrote:
>
> I think, you may want to start kafka and then give some time for its
> initialisation and then submit topology.
>
> On Sat, Nov 19, 2016 at 12:55 AM, Cuneo, Nicholas <ncu...@tycoint.com>
> wrote:
>
> The cause of the null pointer exception is IN kafka spout during an ACK
>
>
>
> My guess is there’s a race condition where it tries to retrieve the topic
> partition from the map but it hasn’t been loaded yet.  Which results in a
> NULL entry during the GET.  When the topology starts up it will immediately
> receive messages but could be slow to startup due to rebalancing of the
> kafka offsets or other tasks it does when it subscribes.
>
>
>
> Like I said, this happens occasionally during startup but not reliably and
> has nothing to do with my code other than I’m probably acking the received
> message faster than the kafka spout can finish initialization.
>
>
>
> Thanks,
>
> Nick
>
>
>
> *From:* Harsh Choudhary [mailto:shry.ha...@gmail.com]
> *Sent:* Friday, November 18, 2016 11:18 AM
> *To:* user@storm.apache.org
> *Subject:* Re: NullPointerException on startup
>
>
>
> Hi
>
> This happens when there is some code in bolt or spout which throws Null
> Pointer Exception. I suggest you to use Debugger in your IDE, to find out
> where is this happening. You can try making a Local Cluster and runs it in
> IDE to figure it out easily.
>
> It never happens because of Storm, so do not raise ticket for now.
>
>
>
>
>
>
> On Sat, Nov 19, 2016 at 12:40 AM, Cuneo, Nicholas <ncu...@tycoint.com>
> wrote:
>
> Hello,
>
>
>
> We are working with storm 1.0.2 and using Kafka client to subscribe to
> kafka topics to retrieve data.  Randomly when our topologies are starting
> up we receive a null pointer exception which is killing the topology.  We
> can’t seem to identify why this happens, or what measures we can take to
> prevent it.  I’m considering raising a ticket with storm to check for null
> to prevent the topology from crashing.
>
>
>
> 2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
> [storm-core-1.0.2.jar:1.0.2]
>
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>
> Caused by: java.lang.NullPointerException
>
> at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316)
> ~[stormjar.jar:?]
>
> at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
> ~[storm-core-1.0.2.jar:1.0.2]
>
> ... 7 more
>
> 2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR]
>

Re: [storm-kafka] where is stored Kafka Spout consummer's offset?

2017-03-28 Thread Harsh Choudhary
On Tue, Mar 28, 2017 at 1:07 PM, Alexandre Vermeerbergen <
avermeerber...@gmail.com> wrote:

> Hi Harsh,
>
> Thanks a lot for your answer.
>



> So you mean that I should look for kafka spout offsets in the zookeeper
> servers which Storm uses for Nimbus/Supervisors communications, is that
> right ?
>
Yes


>
> My problem is to find the zk path of these offsets: in the zookeeper
> servers associated to my Kafka brokers, I have for example offsets stored
> in the following path: /consumers//offsets
>
> However, when I access Storm's zookeeper's nodes using zkCli.sh, I do not
> find any /consumers path.
>

> What is the path of the zk nodes which contain the offset of the Kafka
> spouts?
>

There is no consumers folder but a folder name which is same as topic-id,
you provided in Spout Config. It will be like /topic-id

>
> Best regards,
> Alexandre Vermeerbergen
>
>
> 2017-03-28 8:12 GMT+02:00 Harsh Choudhary <shry.ha...@gmail.com>:
>
>> The storm stores its offset in the Zookeeper, it is connected to. So, you
>> won't find the offset information for the storm clients in the same place
>> where other clients' offsets are. You must have provided topic id in Spout
>> Config in Storm, this is where it stores the offset information in its
>> Zookeeper.
>>
>> *Cheers!*
>>
>> Harsh Choudhary / Software Engineer
>>
>> Blog / express.harshti.me
>>
>> [image: Facebook] <https://facebook.com/shry.harsh> [image: Twitter]
>> <https://twitter.com/har_ssh> [image: Google Plus]
>> <https://plus.google.com/107567038912927268680>
>> <https://in.linkedin.com/in/choudharyharsh> [image: Linkedin]
>> <https://in.linkedin.com/in/choudharyharsh> [image: Instagram]
>> <https://instagram.com/harsh.choudhary>
>> <https://www.pinterest.com/shryharsh/>[image: 500px]
>> <https://500px.com/harshchoudhary> [image: github]
>> <https://github.com/shry15harsh>
>>
>> On Tue, Mar 28, 2017 at 11:27 AM, Alexandre Vermeerbergen <
>> avermeerber...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I need to programmatically monitor the offset of my Kafka Spouts (based
>>> on storm-kafka, not storm-kafka-client).
>>>
>>> I already have a probe checking the offsets of all consumers of my Kafka
>>> Brokers cluster. It takes into account both Zookeeper-based old consumers,
>>> but also newer ones managing their offsets directly in Kafka brokers.
>>>
>>> But my probe do not "see" any of the consumers corresponding to Kafka
>>> spouts.
>>>
>>> So my question is: where does Kafka spouts (from storm-kafka) store
>>> their offsets as Kafka consumers?
>>>
>>> Tested configurations:
>>> * Storm 1.0.1, Storm 1.0.3 and Storm 1.1.0 rc3,
>>> * Kafka Brokers : 0.1.10.1
>>>
>>>
>>> Best regards,
>>> Alexandre Vermeerbergen
>>>
>>>
>>
>


Re: [storm-kafka] where is stored Kafka Spout consummer's offset?

2017-03-28 Thread Harsh Choudhary
The storm stores its offset in the Zookeeper, it is connected to. So, you
won't find the offset information for the storm clients in the same place
where other clients' offsets are. You must have provided topic id in Spout
Config in Storm, this is where it stores the offset information in its
Zookeeper.

*Cheers!*

Harsh Choudhary / Software Engineer

Blog / express.harshti.me

[image: Facebook] <https://facebook.com/shry.harsh> [image: Twitter]
<https://twitter.com/har_ssh> [image: Google Plus]
<https://plus.google.com/107567038912927268680>
<https://in.linkedin.com/in/choudharyharsh> [image: Linkedin]
<https://in.linkedin.com/in/choudharyharsh> [image: Instagram]
<https://instagram.com/harsh.choudhary>
<https://www.pinterest.com/shryharsh/>[image: 500px]
<https://500px.com/harshchoudhary> [image: github]
<https://github.com/shry15harsh>

On Tue, Mar 28, 2017 at 11:27 AM, Alexandre Vermeerbergen <
avermeerber...@gmail.com> wrote:

> Hello,
>
> I need to programmatically monitor the offset of my Kafka Spouts (based on
> storm-kafka, not storm-kafka-client).
>
> I already have a probe checking the offsets of all consumers of my Kafka
> Brokers cluster. It takes into account both Zookeeper-based old consumers,
> but also newer ones managing their offsets directly in Kafka brokers.
>
> But my probe do not "see" any of the consumers corresponding to Kafka
> spouts.
>
> So my question is: where does Kafka spouts (from storm-kafka) store their
> offsets as Kafka consumers?
>
> Tested configurations:
> * Storm 1.0.1, Storm 1.0.3 and Storm 1.1.0 rc3,
> * Kafka Brokers : 0.1.10.1
>
>
> Best regards,
> Alexandre Vermeerbergen
>
>


Re: Storm files and folders permissions on Linux

2017-03-30 Thread Harsh Choudhary
It depends on from which user, you are running storm. That user must own
the folders of storm. So, you need not use chmod but chown.

*Cheers!*

Harsh Choudhary

On Thu, Mar 30, 2017 at 11:24 PM, I PVP <i...@hotmail.com> wrote:

> What are the recommended files/folders permissions for running Storm on
> Linux ?
>
> Can I just set the entire folder tree to 700 or are there specific
> files/folders that required a particular permission ?
>
>
> I am installing Storm 1.0.3 on Red Hat Enterprise Linux 7.3.
> Storm path is /opt/storm
> user: storm:storm.
>
>
> Thanks
>
> IPVP
>
>


Re: Centralized logging for storm

2017-03-31 Thread Harsh Choudhary
Hi Shashank

What we do is, we have filebeats installed on our Storm clusters and they
send the log files data to our central log server, Graylog. This tool is
great and you can see your logs like they are one stream of messages,
sorted by timestamp. One thing that really helps is that you can also
lookup all the other logs near a timestamp.

*Cheers!*

Harsh Choudhary

On Fri, Mar 31, 2017 at 1:16 PM, Shashank Prasad <shash...@machaao.com>
wrote:

> Hi folks,
>
> Storm is a great tool but the logs are all over the place. As you increase
> your workers, your log files will increase as well and there is no single
> file it logs to.
>
> This makes it very hard to troubleshoot since you have to tail multiple
> logs.
>
> Ideally, i would like to ship all the logs for a topology to a centralized
> log server where i could use something like Kibana and filter the logs on
> what i am searching for.
>
> Anyone has any suggestion on how to achieve this or has a use case of how
> you currently doing it.
>
> Thanks a lot for your time!
>
> -shashank
>


Re: Storm files and folders permissions on Linux

2017-03-31 Thread Harsh Choudhary
Can you show the exact error you getting?

*Cheers!*

Harsh Choudhary

On Thu, Mar 30, 2017 at 11:39 PM, I PVP <i...@hotmail.com> wrote:

> that is how it is being done as of now:
>
> sudo chown -R storm:storm /opt/storm
> sudo chmod -R 700 /opt/storm
>
> but still facing some issues while submitting topologies. looks like
> Nimbus is not being able to create blob store directories or files.
>
> i just tried sudo chmod -R 777 /opt/storm. but the issue still happening.
>
>
> On March 30, 2017 at 3:02:07 PM, Harsh Choudhary (shry.ha...@gmail.com)
> wrote:
>
> It depends on from which user, you are running storm. That user must own
> the folders of storm. So, you need not use chmod but chown.
>
> *Cheers!*
>
> Harsh Choudhary
>
> On Thu, Mar 30, 2017 at 11:24 PM, I PVP <i...@hotmail.com> wrote:
>
>> What are the recommended files/folders permissions for running Storm on
>> Linux ?
>>
>> Can I just set the entire folder tree to 700 or are there specific
>> files/folders that required a particular permission ?
>>
>>
>> I am installing Storm 1.0.3 on Red Hat Enterprise Linux 7.3.
>> Storm path is /opt/storm
>> user: storm:storm.
>>
>>
>> Thanks
>>
>> IPVP
>>
>>
>