Re: Introduce lag into a topology spout to process the events with a delay.

2017-02-21 Thread Sandeep Samudrala
Yes. I am reading both the streams from kafka as part of a topology.

On Wed, Feb 22, 2017 at 12:39 AM, Ankur Garg  wrote:

> Hi Sandeep ,
>
> One question :- how are you reading Streams B and A . Are u reading from
> some messaging queue (Kafka , Rabbit Mq etc.) with some spout (as part of
> some topology) reading from them . Please confirm .
>
> Thanks
> Ankur
>
> On Tue, 21 Feb 2017 at 15:28 Sandeep Samudrala 
> wrote:
>
>> Hello,
>>  I have two streams A and B. I need to enrich events coming from stream B
>> with events coming from A and I store events coming from A in a key-value
>> store to enrich events from B. Events that doesn't get enriched are sent to
>> a deferred queue(kafka stream) and are read back later.
>>
>> Most of the the time the events from Stream B are sent to defer queue
>> because of bit delay in storing the events into a key-value store from
>> Stream A and events coming into A and B are almost real time.
>>
>> I want to introduce a delay into reading into my spout reading from
>> Stream B so as to make sure higher % of events get enriched in first shot
>> rather than getting enriched post reading from defer queue. I tried putting
>> a check on the lag and controlling on the backlog queue to get a hold but
>> didn't seemed right and would enter into draining and other issues.
>>
>> Is there a way in the kafka consumer or Storm spout to control the data
>> in flow to come with delay for processing?
>>
>> Thanks,
>> -sandeep.
>>
>


Re: Storm Kafka offset monitoring

2017-02-21 Thread pradeep s
Hi Priyank
Currently I tested the spout using zookeeper broker hosts .This is
processing fine.But I saw another way of initialising spout using Kafka
bootstrap severs using kafkaspoutconfig class.


https://storm.apache.org/releases/1.0.2/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html



But implementing this way I was getting commit failed exception due to
rebalance
Can you point out what's the proper way for implementing Kafka spout.
In storm 1.0.3 docs I have seen the way using zookeeper broker hosts
Regards
Pradeep S
On Tue, Feb 21, 2017 at 2:35 PM Priyank Shah  wrote:

> Hi Pradeep,
>
>
>
> A release vote for RC1 of 1.1.0 is in progress. You can track that and
> once it gets released you can upgrade.
>
>
>
> Regarding upgrading your spout, you don’t need to make any code changes.
> You just need to use the latest released spout code. Usually it involves
> updating a pom file that has a dependency on storm-kafka module to latest
> version.
>
>
>
> *From: *pradeep s 
> *Reply-To: *"user@storm.apache.org" 
> *Date: *Tuesday, February 21, 2017 at 12:49 PM
> *To: *"user@storm.apache.org" 
> *Subject: *Re: Storm Kafka offset monitoring
>
>
>
> Hi Priyank
>
> Thanks for your reply.i was not able to find 1.1.0 version of storm
>
> Can you please point to that.also can you please  confirm on what specific
> spout changes to make.
>
> Regards
>
> Pradeep S
>
> On Tue, Feb 21, 2017 at 10:54 AM Priyank Shah 
> wrote:
>
> Hi Pradeep,
>
>
>
> If you upgrade your spout in the topology and storm code to a later
> version(I checked v1.1.0 and it has the tool) you will get a table in storm
> ui which show you offsets. If you cannot upgrade then I think you will have
> to do it manually.
>
>
>
> *From: *pradeep s 
> *Reply-To: *"user@storm.apache.org" 
> *Date: *Tuesday, February 21, 2017 at 9:44 AM
> *To: *"user@storm.apache.org" 
> *Subject: *Storm Kafka offset monitoring
>
>
>
> Hi ,
>
> I am using Storm 1.0.2 and Kafka 0.10.1.1 versions . Storm spout is
> configured using zookeeper broker hosts. Is there a monitoring ui which i
> can use to track the consumer offsets and lag.
>
> I was using yahoo kafka manager , but its showing storm spout as a
> consumer.Any help?
>
> Regards
>
> Pradeep S
>
>


Re: Storm Kafka offset monitoring

2017-02-21 Thread Priyank Shah
Hi Pradeep,

A release vote for RC1 of 1.1.0 is in progress. You can track that and once it 
gets released you can upgrade.

Regarding upgrading your spout, you don’t need to make any code changes. You 
just need to use the latest released spout code. Usually it involves updating a 
pom file that has a dependency on storm-kafka module to latest version.

From: pradeep s 
Reply-To: "user@storm.apache.org" 
Date: Tuesday, February 21, 2017 at 12:49 PM
To: "user@storm.apache.org" 
Subject: Re: Storm Kafka offset monitoring

Hi Priyank
Thanks for your reply.i was not able to find 1.1.0 version of storm
Can you please point to that.also can you please  confirm on what specific 
spout changes to make.
Regards
Pradeep S
On Tue, Feb 21, 2017 at 10:54 AM Priyank Shah 
> wrote:
Hi Pradeep,

If you upgrade your spout in the topology and storm code to a later version(I 
checked v1.1.0 and it has the tool) you will get a table in storm ui which show 
you offsets. If you cannot upgrade then I think you will have to do it manually.

From: pradeep s 
>
Reply-To: "user@storm.apache.org" 
>
Date: Tuesday, February 21, 2017 at 9:44 AM
To: "user@storm.apache.org" 
>
Subject: Storm Kafka offset monitoring

Hi ,
I am using Storm 1.0.2 and Kafka 0.10.1.1 versions . Storm spout is configured 
using zookeeper broker hosts. Is there a monitoring ui which i can use to track 
the consumer offsets and lag.
I was using yahoo kafka manager , but its showing storm spout as a consumer.Any 
help?
Regards
Pradeep S


Re: Storm Kafka offset monitoring

2017-02-21 Thread pradeep s
Hi Priyank
Thanks for your reply.i was not able to find 1.1.0 version of storm
Can you please point to that.also can you please  confirm on what specific
spout changes to make.
Regards
Pradeep S
On Tue, Feb 21, 2017 at 10:54 AM Priyank Shah  wrote:

> Hi Pradeep,
>
>
>
> If you upgrade your spout in the topology and storm code to a later
> version(I checked v1.1.0 and it has the tool) you will get a table in storm
> ui which show you offsets. If you cannot upgrade then I think you will have
> to do it manually.
>
>
>
> *From: *pradeep s 
> *Reply-To: *"user@storm.apache.org" 
> *Date: *Tuesday, February 21, 2017 at 9:44 AM
> *To: *"user@storm.apache.org" 
> *Subject: *Storm Kafka offset monitoring
>
>
>
> Hi ,
>
> I am using Storm 1.0.2 and Kafka 0.10.1.1 versions . Storm spout is
> configured using zookeeper broker hosts. Is there a monitoring ui which i
> can use to track the consumer offsets and lag.
>
> I was using yahoo kafka manager , but its showing storm spout as a
> consumer.Any help?
>
> Regards
>
> Pradeep S
>


Re: Kafka Spout enable.auto.commit=false

2017-02-21 Thread Igor Kuzmenko
Thanks, Hugo. That's all I want to know abot this.

On Tue, Feb 21, 2017 at 9:01 PM, Hugo Da Cruz Louro 
wrote:

> As per KafkaConsumer documentation
> 
> :
>
> “ Setting enable.auto.commit=true means that offsets are committed
> automatically with a frequency controlled by the config
> auto.commit.interval.ms. “
>
> That means that the Spout disregards if the Tuple is acked or not;  every
> time interval (as controlled by auto.commit.interval.ms ) the offset will
> be committed. This can have have an impact on the delivery guarantees,
> because an offset may be committed, yet the tuple may fail.
>
> On Feb 20, 2017, at 8:15 AM, Igor Kuzmenko  wrote:
>
> Hello, I'd like to understand difference between auto commit mode
> true/false in new KafkaSpout.
>
> With auto.commit.enabled = false KafkaSpout will move my offset relying on
> acked tuples, it seems easy.
>
> But what happens if I turn auto commit on?
> How Kafka make decision which offset to commit?
>
>
>


Re: Introduce lag into a topology spout to process the events with a delay.

2017-02-21 Thread Ankur Garg
Hi Sandeep ,

One question :- how are you reading Streams B and A . Are u reading from
some messaging queue (Kafka , Rabbit Mq etc.) with some spout (as part of
some topology) reading from them . Please confirm .

Thanks
Ankur

On Tue, 21 Feb 2017 at 15:28 Sandeep Samudrala  wrote:

> Hello,
>  I have two streams A and B. I need to enrich events coming from stream B
> with events coming from A and I store events coming from A in a key-value
> store to enrich events from B. Events that doesn't get enriched are sent to
> a deferred queue(kafka stream) and are read back later.
>
> Most of the the time the events from Stream B are sent to defer queue
> because of bit delay in storing the events into a key-value store from
> Stream A and events coming into A and B are almost real time.
>
> I want to introduce a delay into reading into my spout reading from Stream
> B so as to make sure higher % of events get enriched in first shot rather
> than getting enriched post reading from defer queue. I tried putting a
> check on the lag and controlling on the backlog queue to get a hold but
> didn't seemed right and would enter into draining and other issues.
>
> Is there a way in the kafka consumer or Storm spout to control the data in
> flow to come with delay for processing?
>
> Thanks,
> -sandeep.
>


Re: Storm Kafka offset monitoring

2017-02-21 Thread Priyank Shah
Hi Pradeep,

If you upgrade your spout in the topology and storm code to a later version(I 
checked v1.1.0 and it has the tool) you will get a table in storm ui which show 
you offsets. If you cannot upgrade then I think you will have to do it manually.

From: pradeep s 
Reply-To: "user@storm.apache.org" 
Date: Tuesday, February 21, 2017 at 9:44 AM
To: "user@storm.apache.org" 
Subject: Storm Kafka offset monitoring

Hi ,
I am using Storm 1.0.2 and Kafka 0.10.1.1 versions . Storm spout is configured 
using zookeeper broker hosts. Is there a monitoring ui which i can use to track 
the consumer offsets and lag.
I was using yahoo kafka manager , but its showing storm spout as a consumer.Any 
help?
Regards
Pradeep S


Re: Kafka Spout enable.auto.commit=false

2017-02-21 Thread Hugo Da Cruz Louro
As per KafkaConsumer 
documentation:

“ Setting enable.auto.commit=true means that offsets are committed 
automatically with a frequency controlled by the config 
auto.commit.interval.ms. “

That means that the Spout disregards if the Tuple is acked or not;  every time 
interval (as controlled by auto.commit.interval.ms ) the offset will be 
committed. This can have have an impact on the delivery guarantees, because an 
offset may be committed, yet the tuple may fail.

On Feb 20, 2017, at 8:15 AM, Igor Kuzmenko 
> wrote:

Hello, I'd like to understand difference between auto commit mode true/false in 
new KafkaSpout.

With auto.commit.enabled = false KafkaSpout will move my offset relying on 
acked tuples, it seems easy.

But what happens if I turn auto commit on?
How Kafka make decision which offset to commit?



Re: Kafka spout stops commiting offsets on some partitions

2017-02-21 Thread Hugo Da Cruz Louro
Hi Igor,

In normal operation this shouldn’t happen, it it could have happened due to a 
consumer rebalance. We have improved the code recently. From this info alone 
it’s hard to tell what really happened. I could suggest a couple of things.

1. Try reproducing the issue again
2. Enable logs to ALL (not only Debug) to see which tuples get committed and 
which don’t (this may print a lot of messages - however
3. If you have the chance, test a more up to date version of the spout. There 
have been a few bug fixes, but could handle this case (if it’s indeed an issue)

Thanks,
Hugo

On Feb 16, 2017, at 12:59 PM, Igor Kuzmenko 
> wrote:

Thanks for reply Hugo.
I'll double check log tomorrow looking for KafkaSpoutRetryExponentialBackoff 
calls.

I just noticed, that in log I have there's strange thing. First message is 
"Unexpected offset found [2777849]". It's strange because if you look on 
partition 10 commited offset, it is 2777978 which is a little bit higher then 
offset found. The next message in log was "No offsets ready to commit."

So, after checking 2777849 offset it immediately stoped seeking new offset to 
commit.

On Thu, Feb 16, 2017 at 8:23 PM, Hugo Da Cruz Louro 
> wrote:
Hi,

Most likely this is happening because some messages failed and/or got acked out 
of order.

For example, if you process messages with offsets 1,2,3,X,5,6,7,… where X is 
message (with offset 4) that failed, the Spout will only commit offset 3. Until 
the message with offset 4 is acked, or reaches max number of retrials (which is 
configurable but by default is forever), the messages with offsets 5,6,7,… will 
not get committed despite having been acked. That is because you cannot do 
kafkaConsumer.commitSync(new TopicPartion(test_topic,5)) if the message with 
offset 4 has not been acked or discarded by reaching the max number of 
retrials. Until the spout moves on from message with offset 4, the lag will 
increase when new messages come in.

You can try enabling the log level to ALL for 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff to see which 
messages are getting retried. You can also set log level to DEBUG or ALL 
org.apache.storm.kafka.spout.KafkaSpout to see exactly which offsets/records 
are being processed. However, it will print a lot of messages, and may slow 
down processing considerably.

You can also set the maxNumberOfRetires to a small number (e.g. 3-5) to see if 
that solves this situation.

Hugo

> On Feb 16, 2017, at 8:36 AM, Igor Kuzmenko 
> > wrote:
>
> Today in Storm UI I saw this Kafka Spouts Lag:
> IdTopic   Partition   Latest Offset   Spout Committed 
> Offset  Lag
> Kafka Spout   test_topic  0   5591087 5562814 
> 28273
> Kafka Spout   test_topic  1   2803256 2789090 
> 14166
> Kafka Spout   test_topic  2   2801927 2787767 
> 14160
> Kafka Spout   test_topic  3   2800627 2800626 
> 1
> Kafka Spout   test_topic  4   2799391 2785238 
> 14153
> Kafka Spout   test_topic  5   2798126 2798125 
> 1
> Kafka Spout   test_topic  6   2796874 2782726 
> 14148
> Kafka Spout   test_topic  7   2795669 2781528 
> 14141
> Kafka Spout   test_topic  8   2794419 2780280 
> 14139
> Kafka Spout   test_topic  9   2793255 2793254 
> 1
> Kafka Spout   test_topic  10  2792109 2777978 
> 14131
> Kafka Spout   test_topic  11  2790939 2776817 
> 14122
> Kafka Spout   test_topic  12  2789783 2775665 
> 14118
> Kafka Spout   test_topic  13  2788651 2774539 
> 14112
> Kafka Spout   test_topic  14  2787521 2773412 
> 14109
>
>
> There was no new messages in that topic for a while, so I expected, that my 
> topology would process all messages. But lag shows me that there's some 
> uncommitted messages in most of topics. Topology stop working and didn't 
> process any messages for few hours.
>
> In logs I found these messages:
> 2017-02-16 14:50:20.187 o.a.s.k.s.KafkaSpout [DEBUG] Unexpected offset found 
> [2777849]. OffsetEntry{topic-partition=test_topic-10, fetchOffset=2775755, 
> committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10, 
> offset=2777849, numFails=0}, {topic-partition=test_topic-10, offset=2777850, 
> 

Storm Kafka offset monitoring

2017-02-21 Thread pradeep s
Hi ,
I am using Storm 1.0.2 and Kafka 0.10.1.1 versions . Storm spout is
configured using zookeeper broker hosts. Is there a monitoring ui which i
can use to track the consumer offsets and lag.
I was using yahoo kafka manager , but its showing storm spout as a
consumer.Any help?
Regards
Pradeep S


RE: Local Mode issues and undocumented behaviour

2017-02-21 Thread paul.milli...@baesystems.com
HI Petr,

I encountered a similar issue to your first point a while ago. See 
https://issues.apache.org/jira/browse/STORM-2038 (STORM-2038) for some more 
discussion that occurred.

Regards,
Paul

-Original Message-
From: Petr Janeček [mailto:janecekp...@seznam.cz]
Sent: 20 February 2017 10:02
To: user@storm.apache.org
Subject: Local Mode issues and undocumented behaviour

Hello,

you might notice the below email is a simplified repost as the last one got no 
attention which may have been because it was in a wrong thread. Sorry about 
that, but any answer from a reliable source qualifies - even "nobody knows 
anymore" or "not sure, please file a Jira with a minimal reproducible test".

We're using Storm heavily and are trying to get things tested locally as much 
as possible. While doing so, we accumulated a few questions:


1. Since 1.0.3 the Local Cluster on Windows needs the ability to create 
symlinks, but it did not need to do that before. Both 
`LocalCluster.submitTopology()` and `Testing.withSimulatedTimeLocalCluster() 
... Testing.completeTopology()` do this, and it's a major pain for local 
development where our IDEs constantly run tests in local mode.

We read  (which 404s 
for 1.0.1 and 1.0.2, by the way), and running our IDEs as Administrator fixes 
the issue.

It might at be a good idea to add this change into the release notes - 
"Running in Local Mode now requires the symlink creation permission, too." 
Introducing new major features in .build versions is unfortunate :(. Is there 
any configuration to revert to old behaviour, please?


2. Since 1.0.3, every time we run any test on Local Cluster, there is an extra 
directory being created in the root of our project in IDE: 
./logs/workers-artifacts/topologytest-random-uuid, and it contains a single 
file, "worker.yaml".

Is there anything we can do to move this logging to wherever else, 
preferably the ./target directory? I went through the release notes and did not 
find anything related.


3. How does `Testing.completeTopology()` know the topology is completed? We're 
not acking any tuples, so I'd expect the method to return once all tuples have 
internally timed out (or the `CompleteTopologyParam` timeout has passed). 
However, the method returns much sooner (sooner than we'd say our topology is 
"completed"), implying a more clever strategy. Is this deterministic? Are there 
any knobs to turn?


4. Does local mode not honor `conf.registerSerialization()`? This seems 
strange, but if we're sending an instance of  `OurData` class in local mode, 
the serialization fails with `NotSerializableException`, like this:

java.io.NotSerializableException: com.our.company.data.OurData
at org.apache.storm.utils.Utils.javaSerialize(Utils.java:236)
at 
org.apache.storm.thrift$serialize_component_object.invoke(thrift.clj:172)
at 
org.apache.storm.testing$complete_topology.doInvoke(testing.clj:514)
at clojure.lang.RestFn.invoke(RestFn.java:1124)
at 
org.apache.storm.testing4j$_completeTopology.invoke(testing4j.clj:63)
at org.apache.storm.Testing.completeTopology(Unknown Source)

 ...even though we have used `conf.registerSerialization(OurData.class, 
OurDataSerializer.class);`

Slapping `Serializable` on the class fixes the issue, but obviously that's 
not the solution - we don't want to change our class because of local testing, 
and we definitely *want* local testing to use the same serialization mechanism 
as production. I'm very sure there's a lot of people using this functionality, 
so we probably just overlooked something? We even tried:

conf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);  // 
Thanks for fixing this in 1.0.3, by the way!
conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false);

By the way, as far as I know, Kryo can serialize nonserializable classes, 
too, via using the `FieldSerializer`. Is there any hidden option to enable this 
by default instead of Java serialization? Do you have any plans on using this 
instead of Java serialization?


Thank you in advance for any responses, we've been scratching our heads lately.
Petr Janeček
Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under the 
control of BAE Systems Applied Intelligence Limited, details of which can be 
found at http://www.baesystems.com/Businesses/index.htm.


Introduce lag into a topology spout to process the events with a delay.

2017-02-21 Thread Sandeep Samudrala
Hello,
 I have two streams A and B. I need to enrich events coming from stream B
with events coming from A and I store events coming from A in a key-value
store to enrich events from B. Events that doesn't get enriched are sent to
a deferred queue(kafka stream) and are read back later.

Most of the the time the events from Stream B are sent to defer queue
because of bit delay in storing the events into a key-value store from
Stream A and events coming into A and B are almost real time.

I want to introduce a delay into reading into my spout reading from Stream
B so as to make sure higher % of events get enriched in first shot rather
than getting enriched post reading from defer queue. I tried putting a
check on the lag and controlling on the backlog queue to get a hold but
didn't seemed right and would enter into draining and other issues.

Is there a way in the kafka consumer or Storm spout to control the data in
flow to come with delay for processing?

Thanks,
-sandeep.