Re: Adding new user to the broker dynamically

2017-08-01 Thread Alexei Levashov
Hello Manikumar,

I appreciate your advice , thank you.

I tried to use SASL_PLAINTEXT with SCRAM enabled hoping that lack of SSL
will help debugging (will switch to SASL_SSL later).
I have 3 brokers running on one box with different ports
listeners = SASL_PLAINTEXT://:9092
listeners = SASL_PLAINTEXT://:9093
listeners = SASL_PLAINTEXT://:9094

0. Changed broker.properties
listeners = SASL_PLAINTEXT://:9093

sasl.enabled.mechanisms = [SCRAM-SHA-256]
sasl.mechanism.inter.broker.protocol = SCRAM-SHA-256
security.inter.broker.protocol = SASL_PLAINTEXT

1.created admin user for the brokers
 bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config
'SCRAM-SHA-256=password=admin-secret,SCRAM-SHA-512=password=admin-secret'
--entity-type users --entity-name admin

2.created jaas.conf file in config dir :config/kafka_server_jaas.conf

 KafkaServer {
org.apache.kafka.common.security.plain.ScramLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};

3. Added export
KAFKA_OPTS="-Djava.security.auth.login.config=config/kafka_server_jaas.conf"

But I can start only one broker, the moment I start second broker I am
getting exceptions like these:

[2017-08-02 04:30:36,733] DEBUG [Replica Manager on Broker 0]: Recording
follower broker 1 log read results:
ArrayBuffer((TNT_GRP_subgroup_getAttributeList_ACK-1,Fetch Data:
[FetchDataInfo(0 [0 : 0],[],false,None)], HW: [0], leaderLogStartOffset:
[0], leaderLogEndOffset: [0], followerLogStartOffset: [0], fetchTimeMs:
[1501648236733], readSize: [1048576], error: [NONE]))
 (kafka.server.ReplicaManager)

[2017-08-02 04:30:36,803] DEBUG Accepted connection from /:58816 on
/:9093 and assigned it to processor 2, sendBufferSize
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]:
[102400|102400] (kafka.network.Acceptor)
[2017-08-02 04:30:36,803] DEBUG Processor 2 listening to new connection
from /:58816 (kafka.network.Processor)
[2017-08-02 04:30:36,803] DEBUG Set SASL server state to HANDSHAKE_REQUEST
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2017-08-02 04:30:36,803] DEBUG Handle Kafka request METADATA
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2017-08-02 04:30:36,803] DEBUG Set SASL server state to FAILED
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2017-08-02 04:30:36,803] DEBUG Connection with / disconnected
(org.apache.kafka.common.network.Selector)
java.io.IOException:
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected Kafka
request of type METADATA during SASL handshake.
at
org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:247)
at
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:76)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:374)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at kafka.network.Processor.poll(SocketServer.scala:499)
at kafka.network.Processor.run(SocketServer.scala:435)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.IllegalSaslStateException:
Unexpected Kafka request of type METADATA during SASL handshake.
[2017-08-02 04:30:36,905] DEBUG Accepted connection from /:58823 on
/:9093 and assigned it to processor 0, sendBufferSize
[actual|requested]: [102400|102400] recvBufferSize [actual|requested]:
[102400|102400] (kafka.network.Acceptor)
[2017-08-02 04:30:36,905] DEBUG Processor 0 listening to new connection
from /:58823 (kafka.network.Processor)
[2017-08-02 04:30:36,905] DEBUG Set SASL server state to HANDSHAKE_REQUEST
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2017-08-02 04:30:36,905] DEBUG Handle Kafka request METADATA
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2017-08-02 04:30:36,905] DEBUG Set SASL server state to FAILED
(org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2017-08-02 04:30:36,905] DEBUG Connection with / disconnected
(org.apache.kafka.common.network.Selector)
java.io.IOException:
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected Kafka
request of type METADATA during SASL handshake.

Adding separate jaas.conf files for each broker with different users didn't
change anything.

Question - should each broker use separate user for inter broker
communication? Or the reason for exceptions is broker set up on one IP?
Any hints would be highly appreciated.
Thx,
-AL

On Mon, Jul 31, 2017 at 11:08 PM, Manikumar 
wrote:

> Server restart is required, only if you are using  SASL/PLAIN mechanism.
> Other mechanisms (Kerberos, Scram) restart is not required.
>
> https://issues.apache.org/jira/browse/KAFKA-4292 will help us to write
> custom handlers.
>
> On Tue, Aug 1, 2017 at 4:26 AM, Alexei Levashov <
> alexei.levas...@arrayent.com> wrote:
>

Re: Schema Registry Maven Artifacts Missing?

2017-08-01 Thread Debasish Ghosh
U can try this ..

"io.confluent"  % "kafka-avro-serializer"% "3.2.2"

regards.

On Wed, Aug 2, 2017 at 9:08 AM, Chaoran Yu  wrote:

> Does anyone know what artifacts I need to include in my project in order
> to use Schema Registry?
>
> I looked at this SO link: https://stackoverflow.com/
> questions/37317567/how-to-use-the-avro-serializer-with-
> schema-registry-from-a-kafka-connect-sourcet  questions/37317567/how-to-use-the-avro-serializer-with-
> schema-registry-from-a-kafka-connect-sourcet>
> But the artifacts referred to there can’t be found at Maven Central:
> https://mvnrepository.com/artifact/io.confluent.kafka <
> https://mvnrepository.com/artifact/io.confluent.kafka>
>
> Thanks in advance!
>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Schema Registry Maven Artifacts Missing?

2017-08-01 Thread Chaoran Yu
Does anyone know what artifacts I need to include in my project in order to use 
Schema Registry?

I looked at this SO link: 
https://stackoverflow.com/questions/37317567/how-to-use-the-avro-serializer-with-schema-registry-from-a-kafka-connect-sourcet
 

But the artifacts referred to there can’t be found at Maven Central: 
https://mvnrepository.com/artifact/io.confluent.kafka 


Thanks in advance!



Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-01 Thread Jan Filipiak

Hi all,

after some further discussions, the best thing to show my Idea of how it 
should evolve would be a bigger mock/interface description.
The goal is to reduce the store maintaining processors to only the 
Aggregators + and KTableSource. While having KTableSource optionally 
materialized.


Introducing KTable:copy() will allow users to maintain state twice if 
they really want to. KStream::join*() wasn't touched. I never personally 
used that so I didn't feel
comfortable enough touching it. Currently still making up my mind. None 
of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea 
seems ideal here.


please have a look. Looking forward for your opinions.

Best Jan



On 21.06.2017 17:24, Eno Thereska wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable using the 
fluent-like API, probably it’s worth discussing the other examples with joins 
and serdes first since those have many overloads and are in need of some TLC.

So following your example, I guess you’d have something like:
.join()
.withKeySerdes(…)
.withValueSerdes(…)
.withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce the 
number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs,  on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).

So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?

One option would be to use a fluent API approach for providing the optional
params, so something like this:

groupedStream.count()
   .withStoreName("name")
   .withCachingEnabled(false)
   .withLoggingEnabled(config)
   .table()



Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())

Another option is to say: Hey we don't need this, what are you on about!

The above has focussed on state store related overloads, but the same ideas
could  be applied to joins etc, where we presently have many join methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian



@InterfaceStability.Evolving
public interface KTable {

KTable filter(final Predicate predicate);
KTable filterNot(final Predicate predicate);
 KTable mapValues(final ValueMapper 
mapper);

KStream toStream();

KTable copy(); Inserts a new KTableSource
KTable copy(Materialized m); inserts a new KTableSource using 
toStream() as parent


   //I see why, Id rather have users using to+table
KTable through(final String topic);
KTable through(Produced p,
 final String topic);

void to(final String topic);
void to(final Produced
final String topic);

 KGroupedTable groupBy(final KeyValueMapper> selector);
 KGroupedTable groupBy(final KeyValueMapper> selector, Serialized s);

 KTable join(final KTable other,
final ValueJoiner joiner);

 KTable leftJoin(final KTable other,
final ValueJoiner joiner);

 KTable outerJoin(final KTable other,
 final ValueJoiner joiner);

UninitializedQueryHandle QueryHandle(); // causes enable sending old 
value / materialize



//Currently marked deprecated, easily reproduced by map or similiar
void writeAsText(final String filePath);
void writeAsText(final String filePath,
 final String streamName);
void  writeAsText(final String filePath,
  final Serde keySerde,
  final Serde valSerde);
void writeAsText(final String filePath,
 final String streamName,
 final Serde keySerde,
 final Serde valSerde);
void foreach(final ForeachAction action);
}


public interface UninitializedQueryHandle{

QueryHandle initialize(KafkaStreams ks);
}

public interface QueryHandle {

V get(K k);

}

public interface Produced{

Produced static with();
   

Confluent Kafka 3.2.2 - rebalancing not happenning

2017-08-01 Thread karan alang
Hi All -
i'm trying to rebalance Kafka topic (refer link ->
http://docs.confluent.io/current/kafka/rebalancer/rebalancer.html), and
somehow the rebalancing is not working.


Here is what i'm doing ->
- i've 4 Kafka brokers & i've made changes to the server.properties file to
enable Confluent Metrics Reporter.
(attached are the server.properties of the 4 brokers)

-> Created a topic specifying Replica assignment

./bin/kafka-topics --create --topic topic-a1 --replica-assignment
0:1,0:1,0:1,0:1 --zookeeper nwk2-bdp-kafka-04.gdcs-qa.apple.com:3181

-> describe topic

./bin/kafka-topics --describe --topic topic-a1 --zookeeper
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:3181
> Topic:topic-a1 PartitionCount:4 ReplicationFactor:2 Configs:
> Topic: topic-a1 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
> Topic: topic-a1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1
> Topic: topic-a1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
> Topic: topic-a1 Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
>


-> Produce data into topics, using the following command

./bin/kafka-producer-perf-test --topic topic-a1 --num-records 20
> --record-size 1000 --throughput 1000 --producer-props bootstrap.servers=
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9092,
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9082,
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9072,
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9062



-> Force Creation of offsets topic, by creating a Consumer (NOT SURE WHAT
THIS IS FOR ???) :

./bin/kafka-consumer-perf-test --topic topic-a1 --broker-list
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9092,
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9082,
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9072,
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9062 --messages 10



-> run the following command to rebalance

>
> The plan that is presented does not really any rebalancing ->

./bin/confluent-rebalancer execute --zookeeper
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:3181 --metrics-bootstrap-server
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9092,
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9082,
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9072,
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:9062 --throttle 1000 --verbose
>
> Computing the rebalance plan (this may take a while) ...
> You are about to move 0 replica(s) for 0 partitions to 0 broker(s) with
> total size 0 MB.
> The preferred leader for 2 partition(s) will be changed.
> In total, the assignment for 2 partitions will be changed.
> The minimum free volume space is set to 20.0%.
> The following brokers will have less than 40% of free volume space during
> the rebalance:
> Broker Current Size (MB)  Size During Rebalance (MB)   Free % During
> Rebalance  Size After Rebalance (MB)Free % After Rebalance
> 0  4,021.14,021.1  14.2
>   4,021.1  14.2
> 1  1,240.81,240.8  14.2
>   1,240.8  14.2
> 2  620.4  620.414.2
>   620.414.2
> 3  0  014.2
>   014.2
> Min/max stats for brokers (before -> after):
> Type  Leader Count Replica CountSize (MB)
>
> Min   0 (id: 3) -> 0 (id: 3)   0 (id: 3) -> 0 (id: 3)   0 (id: 3)
> -> 0 (id: 3)
> Max   125 (id: 0) -> 123 (id: 0)   127 (id: 0) -> 127 (id: 0)   4,021.1
> (id: 0) -> 4,021.1 (id: 0)
> No racks are defined.
> Broker stats (before -> after):
> Broker Leader CountReplica Count   Size (MB)Free
> Space (%)
> 0  125 -> 123  127 -> 127  4,021.1 -> 4,021.1   14.2 ->
> 14.2
> 1  3 -> 5  12 -> 121,240.8 -> 1,240.8   14.2 ->
> 14.2
> 2  2 -> 2  3 -> 3  620.4 -> 620.4   14.2 ->
> 14.2
> 3  0 -> 0  0 -> 0  0 -> 0   14.2 ->
> 14.2
> Would you like to continue? (y/n): y
> The rebalance has been started, run `status` to check progress.
> Warning: You must run the `status` or `finish` command periodically, until
> the rebalance completes, to ensure the throttle is removed. You can also
> alter the throttle by re-running the execute command passing a new value.
>
>

-> I describe the topic, after rebalancing

./bin/kafka-topics --describe --topic topic-a1 --zookeeper
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:3181
> Topic:topic-a1 PartitionCount:4 ReplicationFactor:2 Configs:
> Topic: topic-a1 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
> Topic: topic-a1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1
> Topic: topic-a1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
> Topic: topic-a1 Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
>
>

My expectation was that the topic will have all 4 brokers as leaders for
the 4 partitions, but that does not seem to be happening.

Any ideas what the issue is ?
# Licensed to the Apache Software Foundation (ASF) under one 

Re: Need clarification on Kafka Usage within our product..

2017-08-01 Thread jan
Don't know if it helps but  says at the bottom

" The contents of this website are © 2016 Apache Software Foundation
under the terms of the Apache License v2. Apache Kafka, Kafka, and the
Kafka logo are either registered trademarks or trademarks of The
Apache Software Foundation in the United States and other countries. "

Also this which is the original proposition and voting for a kafka
logo. Perhaps the proposer can be contacted; he seems to be the one
that designed the logo


You could checkout  and see
if it has any statements on logo use.

Also top 3 hits of 
sound promising but I've not looked at them.

Best I can suggest ATM

jan

On 01/08/2017, Sunil, Rinu  wrote:
> Including another mail id which I found online.   Kindly help in addressing
> the below query.
>
>
>
> Thanks,
>
> Rinu
>
>
>
> From: Sunil, Rinu
> Sent: Monday, July 31, 2017 7:19 PM
> To: 'users@kafka.apache.org' 
> Subject: Need clarification on Kafka Usage within our product..
> Importance: High
>
>
>
> Hi,
>
>
>
> I have a question regarding the usage of Apache Kafka logo within our
> product Unisys Data Exchange WorkBench Application.Team is working on
> enhancing the product to support Kafka as Data Manage Type with XSD message
> format along with other database types like SQL Server, DMSII etc...   To
> help users easily distinguish the Kafka XSD Database in the tree view we
> have used Kafka logo with a blue overlapping strip with an "x" character to
> indicate XSD message format.  Could you please verify the below image
> highlighted with yellow border and confirm if its ok to use?  I could not
> find Kafka logo compliance guidance online.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Thanks,
>
> Rinu
>
>


Re: Kafka Streams Application crashing on Rebalance

2017-08-01 Thread Eric Lalonde

> On Aug 1, 2017, at 10:01 AM, Damian Guy  wrote:
> 
> Hi, Yes the issue is in 0.10.2 also.

Hi,

Any chance of a backport to 0.10.2.x? We have lots and lots of state stores. :)



Re: Kafka Streams Application crashing on Rebalance

2017-08-01 Thread Marcus Clendenin
I'll work on moving to 0.11. Thanks

On Tue, Aug 1, 2017, 1:02 PM Damian Guy  wrote:

> Hi, Yes the issue is in 0.10.2 also.
>
> On Tue, 1 Aug 2017 at 17:37 Eric Lalonde  wrote:
>
> >
> > > On Aug 1, 2017, at 8:00 AM, Damian Guy  wrote:
> > >
> > > It is a bug in 0.10.2 or lower. It has been fixed in 0.11 by
> > > https://issues.apache.org/jira/browse/KAFKA-4494
> >
> > Hi Damien, the Affects Version is set to 0.10.1.0 in KAFKA-4494. Is the
> > issue in 0.10.2.0 as well?
>


Re: increased response time for OffsetCommit requests

2017-08-01 Thread Apurva Mehta
Sorry to keep prodding you with questions, but can you quantify the
increase for the ProduceRequest? What is the workload you are testing
against: specificallly the batch size, message size, linger time settings
of the producers in question?

I ask because we benchmarked 0.11.0 against the older 0.10.0 message format
and found no difference in performance between an 0.10.2 on the 0.10
message format and 0.11.0 on the 0.10 message format.  Could you create a
topic with the 0.10.0 message format and see if there is any degradation
for the same workload?

Thanks,
Apurva


On Tue, Aug 1, 2017 at 2:51 AM, Gaurav Abbi  wrote:

> Hi Apurva,
> There are increases in the *Produce* request also. It is not as substantial
> as compared to *OffsetCommit. *For both of these requests, the major
> contributor is Remote time.
> A couple of other metrics that show different behavior post upgrade:
>
>1. *LogStartOffset*: It has drastically decreased.
>2. *NumDelayedOperations: *It has dropped.
>
> These could be related or may be these are intended good changes in Kafka
> 0.11.0.0 or one of the previous versions.
>
> Best Regards,
> Gaurav Abbi
>
> On Tue, Aug 1, 2017 at 12:11 AM, Apurva Mehta  wrote:
>
> > Thanks for your response. Is it 200% only for the OffsetCommitRequest, or
> > is it similar for all the requests?
> >
> >
> > On Mon, Jul 31, 2017 at 12:48 PM, Gaurav Abbi 
> > wrote:
> >
> > > Hi Apurva,
> > > 1. The increase is about 200%.
> > > 2. There is no increase in throughput. However,  this has caused in
> error
> > > rate and a decrease in the responses received per second.
> > >
> > >
> > > One more thing to mention, we also upgraded to 0.11.0.0 client
> libraries.
> > > We are currently using old Producer and consumer APIs.
> > >
> > >
> > >
> > > Best Regards,
> > > Gaurav Abbi
> > >
> > > On Mon, Jul 31, 2017 at 7:46 PM, Apurva Mehta 
> > wrote:
> > >
> > > > How much is the increase? Is there any increase in throughput?
> > > >
> > > > On Mon, Jul 31, 2017 at 8:04 AM, Gaurav Abbi 
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > > We recently upgraded to Kafka 0.11.0.0 from 0.10.1.1.
> > > > > Since then we have been observing increased latencies especially
> > > > > OffsetCommit requests.
> > > > > Looking at the server side metrics, it seems the culprit is the
> > > Follower
> > > > > time.
> > > > >
> > > > > We are using following
> > > > > inter.broker.protocol.version: 0.11.0.0
> > > > > log.message.format.version: 0.9.0.1
> > > > >
> > > > > Are there some possible pointers that we can explore to
> troubleshoot
> > > the
> > > > > root cause?
> > > > >
> > > > > Best Regards,
> > > > > Gaurav Abbi
> > > > >
> > > >
> > >
> >
>


Re: Kafka Streams Application crashing on Rebalance

2017-08-01 Thread Damian Guy
Hi, Yes the issue is in 0.10.2 also.

On Tue, 1 Aug 2017 at 17:37 Eric Lalonde  wrote:

>
> > On Aug 1, 2017, at 8:00 AM, Damian Guy  wrote:
> >
> > It is a bug in 0.10.2 or lower. It has been fixed in 0.11 by
> > https://issues.apache.org/jira/browse/KAFKA-4494
>
> Hi Damien, the Affects Version is set to 0.10.1.0 in KAFKA-4494. Is the
> issue in 0.10.2.0 as well?


Re: Kafka Streams Application crashing on Rebalance

2017-08-01 Thread Eric Lalonde

> On Aug 1, 2017, at 8:00 AM, Damian Guy  wrote:
> 
> It is a bug in 0.10.2 or lower. It has been fixed in 0.11 by
> https://issues.apache.org/jira/browse/KAFKA-4494

Hi Damien, the Affects Version is set to 0.10.1.0 in KAFKA-4494. Is the issue 
in 0.10.2.0 as well?

Re: Monitor all stream consumers for lag

2017-08-01 Thread Garrett Barton
Oh that makes a lot of sense now that I think of it.  GlobalStores cannot
be part of a group since they have to consume the entire stream per
instance.

On Tue, Aug 1, 2017 at 10:18 AM, Damian Guy  wrote:

> Hi Garrett,
>
> The global state store doesn't use consumer groups and doesn't commit
> offsets. The offsets are checkpointed to local disk, so they won't show up
> with the ConsumerGroupCommand.
>
> That said it would be useful to see the lag, so maybe raise a JIRA for it?
>
> Thanks,
> Damian
>
> On Tue, 1 Aug 2017 at 15:06 Garrett Barton 
> wrote:
>
> > I have a simple stream setup which reads a source topic and forks to an
> > aggregation with its own statestore, and a flatmap().to("topic1") and
> that
> > topic is read in to a global state store.
> >
> > I use ConsumerGroupCommand to query for the lag of each consumer on the
> > topics.
> >
> > https://github.com/apache/kafka/blob/trunk/core/src/
> main/scala/kafka/admin/ConsumerGroupCommand.scala
> >
> > It seems like ConsumerGroupCommand only shows some consumers, but not
> all.
> > I can see the consumer for the original source topic, but I don't see one
> > for 'topic1', yet the globalstatestore is populated.
> >
> > How can I see the lag of the globalstatestore consumer?
> >
>


Re: Kafka Streams Application crashing on Rebalance

2017-08-01 Thread Damian Guy
It is a bug in 0.10.2 or lower. It has been fixed in 0.11 by
https://issues.apache.org/jira/browse/KAFKA-4494

On Tue, 1 Aug 2017 at 15:40 Marcus Clendenin  wrote:

> Hi All,
>
>
>
> I have a kafka streams application that is doing a join between a KTable
> and a KStream and it seems that after it starts loading the KTable if I
> either restart the application or start a new jar with the same
> application-id it starts failing. It looks like when it tries to rejoin the
> application-id and do a rebalance of the partitions it throws an error
> regarding a null value coming from RocksDB. Any thoughts on where this is
> coming from? I am running this inside of a docker container if that affects
> anything but the RocksDB folder is mounted as a volume on the host machine.
>
>
>
>
>
> Stacktrace:
>
>
>
> 2017-08-01 13:31:50,309 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Starting
>
> 2017-08-01 13:31:50,379 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.k.c.consumer.internals.AbstractCoordinator  - Discovered coordinator
> .com:9092 (id: 2147483535 <(214)%20748-3535> rack: null) for group
> test-application-id.
>
> 2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.k.c.consumer.internals.ConsumerCoordinator  - Revoking previously
> assigned partitions [] for group test-application-id
>
> 2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] at state RUNNING: partitions [] revoked at the beginning
> of consumer rebalance.
>
> 2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED.
>
> 2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams org.apache.kafka.streams.KafkaStreams
> - stream-client [test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c]
> State transition from RUNNING to REBALANCING.
>
> 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Updating suspended tasks to contain active tasks []
>
> 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Removing all active tasks []
>
> 2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
> [StreamThread-1] Removing all standby tasks []
>
> 2017-08-01 13:31:50,389 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.k.c.consumer.internals.AbstractCoordinator  - (Re-)joining group
> test-application-id
>
> 2017-08-01 13:31:50,416 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
> [StreamThread-1] Constructed client metadata
> {67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=ClientMetadata{hostInfo=null,
>
> consumers=[test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c-StreamThread-1-consumer-f6ed6af8-0aee-4d2e-92a9-00955f7b3441],
> state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member
> subscriptions.
>
> 2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
> [StreamThread-1] Completed validating internal topics in partition assignor
>
> 2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
> [StreamThread-1] Completed validating internal topics in partition assignor
>
> 2017-08-01 13:31:50,419 trackingId=X thread=[StreamThread-1] logType=INFO
>
> module=kafka.streams
> o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
> [StreamThread-1] Assigned tasks to clients as
> {67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=[activeTasks: ([0_0, 0_1, 0_2, 0_3,
> 0_4, 0_5]) assignedTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 0_5]) prevActiveTasks:
> ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 3.0]}.
>
> 2017-08-01 13:31:50,429 trackingId=X thread=[StreamThread-1] logType=INFO
>
> 

Kafka Streams Application crashing on Rebalance

2017-08-01 Thread Marcus Clendenin
Hi All,



I have a kafka streams application that is doing a join between a KTable
and a KStream and it seems that after it starts loading the KTable if I
either restart the application or start a new jar with the same
application-id it starts failing. It looks like when it tries to rejoin the
application-id and do a rebalance of the partitions it throws an error
regarding a null value coming from RocksDB. Any thoughts on where this is
coming from? I am running this inside of a docker container if that affects
anything but the RocksDB folder is mounted as a volume on the host machine.





Stacktrace:



2017-08-01 13:31:50,309 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Starting

2017-08-01 13:31:50,379 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.k.c.consumer.internals.AbstractCoordinator  - Discovered coordinator
.com:9092 (id: 2147483535 rack: null) for group test-application-id.

2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.k.c.consumer.internals.ConsumerCoordinator  - Revoking previously
assigned partitions [] for group test-application-id

2017-08-01 13:31:50,386 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] at state RUNNING: partitions [] revoked at the beginning
of consumer rebalance.

2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED.

2017-08-01 13:31:50,387 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams org.apache.kafka.streams.KafkaStreams
- stream-client [test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c]
State transition from RUNNING to REBALANCING.

2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Updating suspended tasks to contain active tasks []

2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Removing all active tasks []

2017-08-01 13:31:50,388 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.kafka.streams.processor.internals.StreamThread  - stream-thread
[StreamThread-1] Removing all standby tasks []

2017-08-01 13:31:50,389 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.k.c.consumer.internals.AbstractCoordinator  - (Re-)joining group
test-application-id

2017-08-01 13:31:50,416 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
[StreamThread-1] Constructed client metadata
{67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=ClientMetadata{hostInfo=null,
consumers=[test-application-id-67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c-StreamThread-1-consumer-f6ed6af8-0aee-4d2e-92a9-00955f7b3441],
state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member
subscriptions.

2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
[StreamThread-1] Completed validating internal topics in partition assignor

2017-08-01 13:31:50,417 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
[StreamThread-1] Completed validating internal topics in partition assignor

2017-08-01 13:31:50,419 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.k.s.p.internals.StreamPartitionAssignor  - stream-thread
[StreamThread-1] Assigned tasks to clients as
{67f96d6e-d1fd-4f31-8ec4-45e82a9cf01c=[activeTasks: ([0_0, 0_1, 0_2, 0_3,
0_4, 0_5]) assignedTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 0_5]) prevActiveTasks:
([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 3.0]}.

2017-08-01 13:31:50,429 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.k.c.consumer.internals.AbstractCoordinator  - Successfully joined group
test-application-id with generation 56

2017-08-01 13:31:50,430 trackingId=X thread=[StreamThread-1] logType=INFO

module=kafka.streams
o.a.k.c.consumer.internals.ConsumerCoordinator  - Setting newly assigned
partitions [stream_topic-0, stream_topic-1, stream_topic-2, 

Re: Getting error while consuming data from broker

2017-08-01 Thread M. Manna
Mahesh,

Thanks for sharing the info. Is having "Exactly" 8 brokers a "Must" for
you? because one of them is technically unnecessary since your cluster can
only tolerate 3 failures (even with 7 brokers).
Could you please try the following:

1) Stop the cluster.
2) Increase the number of renum.recovery.threads.per.data.dir from 1 to 5
(i mean something bigger than 1).
3) Retry creating a topic with both partitions and replication-factor set
to 8.

Let us know how it goes.

Regards,

On 31 July 2017 at 18:44, Mahesh Patade  wrote:

> Hi All,
> We are having 8 broker kafka cluster configured in our setup and created a
> topic with 8 partitions & 3 replicas. While trying to consume data from one
> broker(id:6) we are getting below errors and increase in lag for active
> partition on that host. We even tried restarting, deleting logs and
> reinstalling broker but still no luck. This is happening only for one
> broker. If we shut down that broker service  everything works well.
> Kafka Server Logs:
> [2017-07-31 19:32:39,212] INFO [Group Metadata Manager on Broker 6]:
> Finished loading offsets from __consumer_offsets-5 in 5 milliseconds.
> (kafka.coordinator.GroupMe
> tadataManager)
> [2017-07-31 19:32:39,217] INFO [ReplicaFetcherManager on broker 6] Removed
> fetcher for partitions __consumer_offsets-13 (kafka.server.
> ReplicaFetcherManager)
> [2017-07-31 19:32:39,218] INFO [Group Metadata Manager on Broker 6]:
> Loading offsets and group metadata from __consumer_offsets-13
> (kafka.coordinator.GroupMetadataMa
> nager)
> [2017-07-31 19:32:39,220] INFO [ReplicaFetcherManager on broker 6] Removed
> fetcher for partitions __consumer_offsets-37 (kafka.server.
> ReplicaFetcherManager)
> [2017-07-31 19:32:39,230] INFO [ReplicaFetcherManager on broker 6] Removed
> fetcher for partitions __consumer_offsets-45 (kafka.server.
> ReplicaFetcherManager)
> [2017-07-31 19:32:39,232] INFO [Group Metadata Manager on Broker 6]:
> Finished loading offsets from __consumer_offsets-13 in 13 milliseconds.
> (kafka.coordinator.Group
> MetadataManager)
> [2017-07-31 19:32:39,232] INFO [Group Metadata Manager on Broker 6]:
> Loading offsets and group metadata from __consumer_offsets-37
> (kafka.coordinator.GroupMetadataMa
> nager)
> [2017-07-31 19:32:39,248] INFO [ReplicaFetcherManager on broker 6] Removed
> fetcher for partitions bseEquityAggFeedProd-4 (kafka.server.
> ReplicaFetcherManager)
> [2017-07-31 19:32:39,249] INFO Truncating log bseEquityAggFeedProd-4 to
> offset 13082598. (kafka.log.Log)
> [2017-07-31 19:32:39,276] INFO [ReplicaFetcherManager on broker 6] Added
> fetcher for partitions List([bseEquityAggFeedProd-4, initOffset 13082598 to
> broker BrokerEnd
> Point(2,172.29.51.52,9092)] ) (kafka.server.ReplicaFetcherManager)
> [2017-07-31 19:32:39,277] INFO [ReplicaFetcherThread-0-2], Starting
> (kafka.server.ReplicaFetcherThread)
> [2017-07-31 19:32:39,294] INFO [ReplicaFetcherManager on broker 6] Removed
> fetcher for partitions __consumer_offsets-17 (kafka.server.
> ReplicaFetcherManager)
> [2017-07-31 19:32:39,294] INFO Truncating log __consumer_offsets-17 to
> offset 0. (kafka.log.Log)
> [2017-07-31 19:32:39,300] INFO [ReplicaFetcherManager on broker 6] Added
> fetcher for partitions List([__consumer_offsets-17, initOffset 0 to broker
> BrokerEndPoint(2,
> 172.29.51.52,9092)] ) (kafka.server.ReplicaFetcherManager)
> [2017-07-31 19:32:39,304] ERROR [ReplicaFetcherThread-0-5], Error for
> partition [__consumer_offsets,9] to broker 5:org.apache.kafka.common.
> errors.NotLeaderForPartiti
> onException: This server is not the leader for that topic-partition.
> (kafka.server.ReplicaFetcherThread)
> [2017-07-31 19:32:39,308] ERROR [ReplicaFetcherThread-0-2], Error for
> partition [bseEquityAggFeedProd,4] to broker 2:org.apache.kafka.common.
> errors.NotLeaderForParti
> tionException: This server is not the leader for that topic-partition.
> (kafka.server.ReplicaFetcherThread)
> [2017-07-31 19:32:39,313] ERROR [ReplicaFetcherThread-0-2], Error for
> partition [__consumer_offsets,17] to broker 2:org.apache.kafka.common.
> errors.NotLeaderForPartit
> ionException: This server is not the leader for that topic-partition.
> (kafka.server.ReplicaFetcherThread)
> [2017-07-31 19:32:39,318] INFO [ReplicaFetcherManager on broker 6] Removed
> fetcher for partitions __consumer_offsets-9 (kafka.server.
> ReplicaFetcherManager)
> [2017-07-31 19:32:39,319] INFO Truncating log __consumer_offsets-9 to
> offset 392. (kafka.log.Log)
> [2017-07-31 19:32:39,324] INFO [ReplicaFetcherManager on broker 6] Added
> fetcher for partitions List([__consumer_offsets-9, initOffset 392 to broker
> BrokerEndPoint(2
> ,172.29.51.52,9092)] ) (kafka.server.ReplicaFetcherManager)
> [2017-07-31 19:32:39,339] ERROR [ReplicaFetcherThread-0-2], Error for
> partition [__consumer_offsets,9] to broker 2:org.apache.kafka.common.
> errors.NotLeaderForPartiti
> onException: This server is not the leader for that topic-partition.
> 

Re: Monitor all stream consumers for lag

2017-08-01 Thread Damian Guy
Hi Garrett,

The global state store doesn't use consumer groups and doesn't commit
offsets. The offsets are checkpointed to local disk, so they won't show up
with the ConsumerGroupCommand.

That said it would be useful to see the lag, so maybe raise a JIRA for it?

Thanks,
Damian

On Tue, 1 Aug 2017 at 15:06 Garrett Barton  wrote:

> I have a simple stream setup which reads a source topic and forks to an
> aggregation with its own statestore, and a flatmap().to("topic1") and that
> topic is read in to a global state store.
>
> I use ConsumerGroupCommand to query for the lag of each consumer on the
> topics.
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
>
> It seems like ConsumerGroupCommand only shows some consumers, but not all.
> I can see the consumer for the original source topic, but I don't see one
> for 'topic1', yet the globalstatestore is populated.
>
> How can I see the lag of the globalstatestore consumer?
>


Re: Kafka streams store migration - best practices

2017-08-01 Thread Damian Guy
No you don't need to set a listener. Was just mentioning as it an option if
you wan't to know that the metadata needs refreshing,

On Tue, 1 Aug 2017 at 13:25 Debasish Ghosh  wrote:

> Regarding the last point, do I need to set up the listener ?
>
> All I want is to do a query from the store. For that I need to invoke 
> streams.store()
> first, which can potentially throw an InvalidStateStoreException during
> rebalancing / migration of stores. If I call streams.store() with retries
> till the rebalancing is done or I exceed some max retry count, then I think
> I should good.
>
> Or am I missing something ?
>
> regards.
>
> On Tue, Aug 1, 2017 at 1:10 PM, Damian Guy  wrote:
>
>> Hi,
>>
>> On Tue, 1 Aug 2017 at 08:34 Debasish Ghosh 
>> wrote:
>>
>>> Hi -
>>>
>>> I have a Kafka Streams application that needs to run on multiple
>>> instances.
>>> It fetches metadata from all local stores and has an http query layer for
>>> interactive queries. In some cases when I have new instances deployed,
>>> store migration takes place making the current metadata invalid. Here are
>>> my questions regarding some of the best practices to be followed to
>>> handle
>>> this issue of store migration -
>>>
>>>- When the migration is in process, a query for the metadata may
>>> result
>>>in InvalidStateStoreException - is it a good practice to always have a
>>>retry semantics based query for the metadata ?
>>>
>>
>> Yes. Whenever the application is rebalancing the stores will be
>> unavailable, so retrying is the right thing to do.
>>
>>
>>>- Should I check KafkaStreams.state() and only assume that I have got
>>>the correct metadata when the state() call returns Running. If it
>>>returns Rebalancing, then I should re-query. Is this correct approach
>>> ?
>>>
>>
>> Correct again! If the state is rebalancing, then the metadata (for some
>> stores at least) is going to change, so you should get it again. You can
>> set a StateListener on the KafkaStreams instance to listen to these events.
>>
>>
>>>
>>> regards.
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


Why?? Kafka High Level Consumer is 0.8.2.2 Stop Responding After Few Hours... Timeout in Broker Log

2017-08-01 Thread Rachana Srivastava
I have written a high level Kafka consumer which is not responding on event 
sent after 2 hours.

Only error I see in broker log like this.

2017-08-01 05:29:42,102 INFO kafka.network.Processor: Closing socket 
connection to /10.202.138.126.
2017-08-01 07:34:24,957 ERROR kafka.network.Processor: Closing socket for 
/XX:XXX.110.61 because of error
java.io.IOException: Connection timed out
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:445)
at kafka.network.Processor.run(SocketServer.scala:341)
at java.lang.Thread.run(Thread.java:745)

I have written a simple Kafka High Level consumer.  I have not specified any 
value for the  consumer.timeout.ms.  Therefore assuming consumer will never 
timeout when no message is received from the producer.  I have observed that 
when we do not receive any message from producer for sometime the consumer stop 
responding to any message that is received after say 10 min.  I have set 
zookeeper.sync.time.ms=6000
and zookeeper.session.timeout.ms=50


High level Consumer:

Map>> consumerStreams = 
consumer.createMessageStreams(topicCount);
List> streams = 
consumerStreams.get(topic);
try{
executor = Executors.newFixedThreadPool(threadCount);

int threadNumber = 0;
for (final KafkaStream stream : 
streams) {
executor.submit(new EmailProcessorThread(stream, 
threadNumber, context, redisTemplate ));
//Thread RUN
 public void run() {
while(true){
ConsumerIterator it = 
stream.iterator();
while (it.hasNext()) {
try{
ObjectMapper mapper = new 
ObjectMapper();
String 
recordValue = new String(it.next().message());
LOG.info("Message received from 
the topic is " + recordValue);




Getting error while consuming data from broker

2017-08-01 Thread Mahesh Patade
Hi All,
We are having 8 broker kafka cluster configured in our setup and created a 
topic with 8 partitions & 3 replicas. While trying to consume data from one 
broker(id:6) we are getting below errors and increase in lag for active 
partition on that host. We even tried restarting, deleting logs and 
reinstalling broker but still no luck. This is happening only for one broker. 
If we shut down that broker service  everything works well.
Kafka Server Logs:
[2017-07-31 19:32:39,212] INFO [Group Metadata Manager on Broker 6]: Finished 
loading offsets from __consumer_offsets-5 in 5 milliseconds. 
(kafka.coordinator.GroupMe
tadataManager)
[2017-07-31 19:32:39,217] INFO [ReplicaFetcherManager on broker 6] Removed 
fetcher for partitions __consumer_offsets-13 
(kafka.server.ReplicaFetcherManager)
[2017-07-31 19:32:39,218] INFO [Group Metadata Manager on Broker 6]: Loading 
offsets and group metadata from __consumer_offsets-13 
(kafka.coordinator.GroupMetadataMa
nager)
[2017-07-31 19:32:39,220] INFO [ReplicaFetcherManager on broker 6] Removed 
fetcher for partitions __consumer_offsets-37 
(kafka.server.ReplicaFetcherManager)
[2017-07-31 19:32:39,230] INFO [ReplicaFetcherManager on broker 6] Removed 
fetcher for partitions __consumer_offsets-45 
(kafka.server.ReplicaFetcherManager)
[2017-07-31 19:32:39,232] INFO [Group Metadata Manager on Broker 6]: Finished 
loading offsets from __consumer_offsets-13 in 13 milliseconds. 
(kafka.coordinator.Group
MetadataManager)
[2017-07-31 19:32:39,232] INFO [Group Metadata Manager on Broker 6]: Loading 
offsets and group metadata from __consumer_offsets-37 
(kafka.coordinator.GroupMetadataMa
nager)
[2017-07-31 19:32:39,248] INFO [ReplicaFetcherManager on broker 6] Removed 
fetcher for partitions bseEquityAggFeedProd-4 
(kafka.server.ReplicaFetcherManager)
[2017-07-31 19:32:39,249] INFO Truncating log bseEquityAggFeedProd-4 to offset 
13082598. (kafka.log.Log)
[2017-07-31 19:32:39,276] INFO [ReplicaFetcherManager on broker 6] Added 
fetcher for partitions List([bseEquityAggFeedProd-4, initOffset 13082598 to 
broker BrokerEnd
Point(2,172.29.51.52,9092)] ) (kafka.server.ReplicaFetcherManager)
[2017-07-31 19:32:39,277] INFO [ReplicaFetcherThread-0-2], Starting  
(kafka.server.ReplicaFetcherThread)
[2017-07-31 19:32:39,294] INFO [ReplicaFetcherManager on broker 6] Removed 
fetcher for partitions __consumer_offsets-17 
(kafka.server.ReplicaFetcherManager)
[2017-07-31 19:32:39,294] INFO Truncating log __consumer_offsets-17 to offset 
0. (kafka.log.Log)
[2017-07-31 19:32:39,300] INFO [ReplicaFetcherManager on broker 6] Added 
fetcher for partitions List([__consumer_offsets-17, initOffset 0 to broker 
BrokerEndPoint(2,
172.29.51.52,9092)] ) (kafka.server.ReplicaFetcherManager)
[2017-07-31 19:32:39,304] ERROR [ReplicaFetcherThread-0-5], Error for partition 
[__consumer_offsets,9] to broker 
5:org.apache.kafka.common.errors.NotLeaderForPartiti
onException: This server is not the leader for that topic-partition. 
(kafka.server.ReplicaFetcherThread)
[2017-07-31 19:32:39,308] ERROR [ReplicaFetcherThread-0-2], Error for partition 
[bseEquityAggFeedProd,4] to broker 
2:org.apache.kafka.common.errors.NotLeaderForParti
tionException: This server is not the leader for that topic-partition. 
(kafka.server.ReplicaFetcherThread)
[2017-07-31 19:32:39,313] ERROR [ReplicaFetcherThread-0-2], Error for partition 
[__consumer_offsets,17] to broker 
2:org.apache.kafka.common.errors.NotLeaderForPartit
ionException: This server is not the leader for that topic-partition. 
(kafka.server.ReplicaFetcherThread)
[2017-07-31 19:32:39,318] INFO [ReplicaFetcherManager on broker 6] Removed 
fetcher for partitions __consumer_offsets-9 (kafka.server.ReplicaFetcherManager)
[2017-07-31 19:32:39,319] INFO Truncating log __consumer_offsets-9 to offset 
392. (kafka.log.Log)
[2017-07-31 19:32:39,324] INFO [ReplicaFetcherManager on broker 6] Added 
fetcher for partitions List([__consumer_offsets-9, initOffset 392 to broker 
BrokerEndPoint(2
,172.29.51.52,9092)] ) (kafka.server.ReplicaFetcherManager)
[2017-07-31 19:32:39,339] ERROR [ReplicaFetcherThread-0-2], Error for partition 
[__consumer_offsets,9] to broker 
2:org.apache.kafka.common.errors.NotLeaderForPartiti
onException: This server is not the leader for that topic-partition. 
(kafka.server.ReplicaFetcherThread)
[2017-07-31 19:32:39,986] INFO [GroupCoordinator 6]: Loading group metadata for 
bseEquityLegacyBod with generation 4 (kafka.coordinator.GroupCoordinator)
[2017-07-31 19:32:39,991] INFO [Group Metadata Manager on Broker 6]: Finished 
loading offsets from __consumer_offsets-37 in 759 milliseconds. 
(kafka.coordinator.Grou
pMetadataManager)

Consumer Logs:

17/07/31/19:26:20.005 2-thread-3  INFO nals.AbstractCoordinator Discovered 
coordinator 172.XX.51.56:9092 (id: 2147483641 rack: null) for group 
bseEquityLegacyBod.
17/07/31/19:26:20.005 2-thread-3  INFO nals.AbstractCoordinator Marking the 
coordinator 172.XX.51.56:9092 (id: 2147483641 rack: null) dead for 

Re: Unit testing changes in core Scala sources

2017-08-01 Thread Tom Bentley
Hi,

I'm no gradle expert, but it looks like gradle core:cleanTest core:test
would do what you want. Have you looked at the example command lines in the
README.md?

Cheers,

Tom

On 27 July 2017 at 15:36, M. Manna  wrote:

> Hello,
>
> Please forgive me for asking too simply question (since I haven't done any
> Scala development).
>
> I am trying to see if a fix works for Windows OS. I have made some changes
> in core package and trying to run unitTest gradle command. The test already
> exists in existing Kafka source code (so i am not writing duplicate tests
> :) ). But I cannot see the unit test passing at all.
>
> When I do *gradle jar -debug* it actually packages the jar and the solution
> works! So my question is "Which build/test command should I use to pick up
> changes in Scala files and run the unit tests to check changes"?
>
> Kindest Regards,
>


Re: increased response time for OffsetCommit requests

2017-08-01 Thread Gaurav Abbi
Hi Apurva,
There are increases in the *Produce* request also. It is not as substantial
as compared to *OffsetCommit. *For both of these requests, the major
contributor is Remote time.
A couple of other metrics that show different behavior post upgrade:

   1. *LogStartOffset*: It has drastically decreased.
   2. *NumDelayedOperations: *It has dropped.

These could be related or may be these are intended good changes in Kafka
0.11.0.0 or one of the previous versions.

Best Regards,
Gaurav Abbi

On Tue, Aug 1, 2017 at 12:11 AM, Apurva Mehta  wrote:

> Thanks for your response. Is it 200% only for the OffsetCommitRequest, or
> is it similar for all the requests?
>
>
> On Mon, Jul 31, 2017 at 12:48 PM, Gaurav Abbi 
> wrote:
>
> > Hi Apurva,
> > 1. The increase is about 200%.
> > 2. There is no increase in throughput. However,  this has caused in error
> > rate and a decrease in the responses received per second.
> >
> >
> > One more thing to mention, we also upgraded to 0.11.0.0 client libraries.
> > We are currently using old Producer and consumer APIs.
> >
> >
> >
> > Best Regards,
> > Gaurav Abbi
> >
> > On Mon, Jul 31, 2017 at 7:46 PM, Apurva Mehta 
> wrote:
> >
> > > How much is the increase? Is there any increase in throughput?
> > >
> > > On Mon, Jul 31, 2017 at 8:04 AM, Gaurav Abbi 
> > > wrote:
> > >
> > > > Hi All,
> > > > We recently upgraded to Kafka 0.11.0.0 from 0.10.1.1.
> > > > Since then we have been observing increased latencies especially
> > > > OffsetCommit requests.
> > > > Looking at the server side metrics, it seems the culprit is the
> > Follower
> > > > time.
> > > >
> > > > We are using following
> > > > inter.broker.protocol.version: 0.11.0.0
> > > > log.message.format.version: 0.9.0.1
> > > >
> > > > Are there some possible pointers that we can explore to troubleshoot
> > the
> > > > root cause?
> > > >
> > > > Best Regards,
> > > > Gaurav Abbi
> > > >
> > >
> >
>


Re: Kafka streams store migration - best practices

2017-08-01 Thread Damian Guy
Hi,

On Tue, 1 Aug 2017 at 08:34 Debasish Ghosh  wrote:

> Hi -
>
> I have a Kafka Streams application that needs to run on multiple instances.
> It fetches metadata from all local stores and has an http query layer for
> interactive queries. In some cases when I have new instances deployed,
> store migration takes place making the current metadata invalid. Here are
> my questions regarding some of the best practices to be followed to handle
> this issue of store migration -
>
>- When the migration is in process, a query for the metadata may result
>in InvalidStateStoreException - is it a good practice to always have a
>retry semantics based query for the metadata ?
>

Yes. Whenever the application is rebalancing the stores will be
unavailable, so retrying is the right thing to do.


>- Should I check KafkaStreams.state() and only assume that I have got
>the correct metadata when the state() call returns Running. If it
>returns Rebalancing, then I should re-query. Is this correct approach ?
>

Correct again! If the state is rebalancing, then the metadata (for some
stores at least) is going to change, so you should get it again. You can
set a StateListener on the KafkaStreams instance to listen to these events.


>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


Kafka streams store migration - best practices

2017-08-01 Thread Debasish Ghosh
Hi -

I have a Kafka Streams application that needs to run on multiple instances.
It fetches metadata from all local stores and has an http query layer for
interactive queries. In some cases when I have new instances deployed,
store migration takes place making the current metadata invalid. Here are
my questions regarding some of the best practices to be followed to handle
this issue of store migration -

   - When the migration is in process, a query for the metadata may result
   in InvalidStateStoreException - is it a good practice to always have a
   retry semantics based query for the metadata ?
   - Should I check KafkaStreams.state() and only assume that I have got
   the correct metadata when the state() call returns Running. If it
   returns Rebalancing, then I should re-query. Is this correct approach ?

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: Kafka Streams state store issue on cluster

2017-08-01 Thread Debasish Ghosh
I fixed this problem by decoupling the 2 issues ..

   - since I am using an http service for interactive queries, I need an
   http interface. For that I was using 0.0.0.0 and a port number. This is
   still the same - the only change I made is to have a  Mesos configuration
   where Marathon picks up a random port number for me. This ensures that I
   don't ever end up with a port that's currently being used by something else.
   - for APPLICATION_SERVER_CONFIG, I cannot use 0.0.0.0 since I need an
   actual address so that I can use it to query Kafka streams metadata and
   issue a subsequent http GET to fetch the actual data from the host that has
   it. So when setting APPLICATION_SERVER_CONFIG I use the actual INetAddress
   of the host.

This fixes the problem.

regards.

On Fri, Jul 28, 2017 at 8:46 PM, Damian Guy  wrote:

> Hmmm, i'm not sure that is going to work as both nodes will have the same
> setting for StreamsConfig.APPLICATION_SERVER_PORT, i.e, 0.0.0.0:7070
>
> On Fri, 28 Jul 2017 at 16:02 Debasish Ghosh 
> wrote:
>
>> The log file is a huge one. I can send it to you though. Before that let
>> me confirm one point ..
>>
>> I set the APPLICATION_SERVER_CONFIG to s"${config.httpInterface}:${
>> config.httpPort}". In my case the httpInterface is "0.0.0.0" and the port
>> is set to 7070. Since the two instances start on different nodes, this
>> should be ok - right ?
>>
>> regards.
>>
>> On Fri, Jul 28, 2017 at 8:18 PM, Damian Guy  wrote:
>>
>>> Do you have any logs that might help to work out what is going wrong?
>>>
>>> On Fri, 28 Jul 2017 at 14:16 Damian Guy  wrote:
>>>
 The config looks ok to me

 On Fri, 28 Jul 2017 at 13:24 Debasish Ghosh 
 wrote:

> I am setting APPLICATION_SERVER_CONFIG, which is possibly what u r
> referring to. Just now I noticed that I may also need to set
> REPLICATION_FACTOR_CONFIG, which needs to be set to 2 (default is 1).
> Anything else that I may be missing ?
>
>
> regards.
>
> On Fri, Jul 28, 2017 at 5:46 PM, Debasish Ghosh <
> ghosh.debas...@gmail.com>
> wrote:
>
> > Hi Damien -
> >
> > I am not sure I understand what u mean .. I have the following set
> in the
> > application .. Do I need to set anything else at the host level ?
> > Environment variable ?
> >
> > val streamingConfig = {
> >   val settings = new Properties
> >   settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > "kstream-weblog-processing")
> >   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> config.brokers)
> >
> >   config.schemaRegistryUrl.foreach{ url =>
> > settings.put(AbstractKafkaAvroSerDeConfig.
> SCHEMA_REGISTRY_URL_CONFIG,
> > url)
> >   }
> >
> >   settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.ByteArray.getClass.getName)
> >   settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String.getClass.getName)
> >
> >   // setting offset reset to earliest so that we can re-run the
> demo
> > code with the same pre-loaded data
> >   // Note: To re-run the demo, you need to use the offset reset
> tool:
> >   // https://cwiki.apache.org/confluence/display/KAFKA/
> > Kafka+Streams+Application+Reset+Tool
> >   settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "earliest")
> >
> >   // need this for query service
> >   settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
> > s"${config.httpInterface}:${config.httpPort}")
> >
> >   // default is /tmp/kafka-streams
> >   settings.put(StreamsConfig.STATE_DIR_CONFIG,
> config.stateStoreDir)
> >
> >   // Set the commit interval to 500ms so that any changes are
> flushed
> > frequently and the summary
> >   // data are updated with low latency.
> >   settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500")
> >
> >   settings
> > }
> >
> > Please explain a bit ..
> >
> > regards.
> >
> >
> > On Fri, Jul 28, 2017 at 5:36 PM, Damian Guy 
> wrote:
> >
> >> Hi,
> >>
> >> Do you have the application.server property set appropriately for
> both
> >> hosts?
> >>
> >> The second stack trace is this bug:
> >> https://issues.apache.org/jira/browse/KAFKA-5556
> >>
> >> On Fri, 28 Jul 2017 at 12:55 Debasish Ghosh <
> ghosh.debas...@gmail.com>
> >> wrote:
> >>
> >> > Hi -
> >> >
> >> > In my Kafka Streams application, I have a state store resulting
> from a
> >> > stateful streaming topology. The environment is
> >> >
> >> >- Kafka 0.10.2.1
> >> >- It runs on a DC/OS cluster
> >> >- I