JMX metrics for replica lag time

2017-02-19 Thread Jun MA
Hi,

I’m looking for the JMX metrics to represent replica lag time for 0.9.0.1. Base 
on the documentation, I can only find 
kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica, which is 
max lag in messages btw follower and leader replicas. But since in 0.9.0.1 lag 
in messages is deprecated and replaced with lag time, I’m wondering what is the 
corresponding metrics for this?

Thanks,
Jun

Re: Storm kafka integration

2017-02-19 Thread Yuanjia
Hi,
You can use storm-kafka-client, which support storing consumer offsets in kafka 
cluster.



Yuanjia Li
 
From: pradeep s
Date: 2017-02-20 03:49
To: users
Subject: Storm kafka integration
Hi,
I am using Storm 1.0.2 and Kafka 0.10.1.1 and have query on Spout code to
integrate with Kafka. As per storm docs , its mentioned to use Broker Hosts
to register the Kafka Spout.
http://storm.apache.org/releases/1.0.2/storm-kafka.html
 
In this case will the consumer offsets be stored in zookeeper. Is this the
preferred approach .
I have read that in latest Kafka version , consumer offsets can be
maintained in Kafka cluster itslef. Is there any storm spout example for
this .
Regards
Pradeep S


Re: Kafka exception

2017-02-19 Thread Yuanjia
Hi Liwu,
Correct me if I am wrong.
When calling the method ConsumerConnector.shutdown(), it will send  
"ZookeeperConsumerConnector.shutdownCommand" to the queue, not set 
ConsumerIterator's state is NOT_READY directly. So the consumer will continue 
consuming until get the shutdownCommand in the queue. 
Is there any exception information when calling the method 
ConsumerIterator.makeNext()?




Yuanjia Li
 
From: 揣立武
Date: 2017-02-16 14:29
To: dev
CC: users; 陈希; 涂扬
Subject: Kafka exception
 
Hi,all! Our program uses the high level consumer api(the version is 0.8.x). 
Sometimes the program will throw an exception in the 42th row in 
kafka.utils.IteratorTemplate class,the content is "throw new 
IllegalStateException("Expected item but none found.")". 
 
I think it is a race condition problem between the close thread and the consume 
thread. When the close thread calling the method ConsumerConnector.shutdown(), 
it will set ConsumerIterator's state is NOT_READY. But at the same time, the 
consume thread calls the method ConsumerIterator.hasNext() and goes to the 67th 
row in  kafka.utils.IteratorTemplate class,the content is "if(state == DONE) 
{", the if will be false that means has a item. And when calling the 
ConsumerIterator.next(), it will throw that exception.
 
Have you ever had this problem? Please tell me how to deal with it, thanks!
 
 
 


Re: Log twice ProducerConfig & ConsumerConfig values

2017-02-19 Thread Francesco laTorre
Hi,

And on a side note, it's logged _many_ times. I had to suppress some
logging at package level :-/
Anybody else experiencing the same ?

Cheers,
Francesco

On 20 February 2017 at 00:04, Simon Teles  wrote:

> Hello,
>
> I'm curious to know why, when the producer/consumer are creating, the
> ProducerConfig, ConsumerConfig are logged twice ? Is that normal ?
>
> Example :
>
> 10:52:08.963 INFO  [o.a.k.s.p.i.StreamThread||l.170] ~~ Creating
> producer client for stream thread [StreamThread-1]
> 10:52:08.969 INFO  [o.a.k.c.p.ProducerConfig|logAll|l.178] ~~
> ProducerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 30
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [kafka:9092]
> ssl.keystore.type = JKS
> sasl.mechanism = GSSAPI
> max.block.ms = 6
> interceptor.classes = null
> ssl.truststore.password = null
> client.id = test-stream-2-StreamThread-1-producer
> ssl.endpoint.identification.algorithm = null
> request.timeout.ms = 3
> acks = 1
> receive.buffer.bytes = 32768
> ssl.truststore.type = JKS
> retries = 0
> ssl.truststore.location = null
> ssl.keystore.password = null
> send.buffer.bytes = 131072
> compression.type = none
> metadata.fetch.timeout.ms = 6
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 3
> key.serializer = class org.apache.kafka.common.serial
> ization.ByteArraySerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> batch.size = 16384
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> max.request.size = 1048576
> value.serializer = class org.apache.kafka.common.serial
> ization.ByteArraySerializer
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> partitioner.class = class org.apache.kafka.clients.produ
> cer.internals.DefaultPartitioner
> linger.ms = 100
>
> 10:52:08.996 INFO  [o.a.k.c.p.ProducerConfig|logAll|l.178] ~~
> ProducerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 30
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [kafka:9092]
> ssl.keystore.type = JKS
> sasl.mechanism = GSSAPI
> max.block.ms = 6
> interceptor.classes = null
> ssl.truststore.password = null
> client.id = test-stream-2-StreamThread-1-producer
> ssl.endpoint.identification.algorithm = null
> request.timeout.ms = 3
> acks = 1
> receive.buffer.bytes = 32768
> ssl.truststore.type = JKS
> retries = 0
> ssl.truststore.location = null
> ssl.keystore.password = null
> send.buffer.bytes = 131072
> compression.type = none
> metadata.fetch.timeout.ms = 6
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 3
> key.serializer = class org.apache.kafka.common.serial
> ization.ByteArraySerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> batch.size = 16384
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> max.request.size = 1048576
> value.serializer = class org.apache.kafka.common.serial
> ization.ByteArraySerializer
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> partitioner.class = class org.apache.kafka.clients.produ
> cer.internals.DefaultPartitioner
> linger.ms = 100
>
> It's the same for ConsumerConfig and for "Creating restore consumer
> client".
>
> Thanks,
>
> Simon
>
>


-- 
 Francesco laTorre
Senior Developer
T: +44 208 742 1600
+44 203 249 8394

E: francesco.lato...@openbet.com
W: www.openbet.com
OpenBet Ltd
Chiswick Park Building 9
566 Chiswick High Rd
London
W4 5XT

This message is confidential and intended only for the addressee. If you
have received this message in error, please 

Security Documentation contradiction / misleading ?

2017-02-19 Thread Stephane Maarek
Hi,

I’m wondering if the official Kafka documentation is misleading. Here (
https://kafka.apache.org/documentation/#security_sasl_brokernotes) you can
read:

   1. Client section is used to authenticate a SASL connection with
   zookeeper. It also allows the brokers to set SASL ACL on zookeeper nodes
   which locks these nodes down so that only the brokers can modify it.* It
   is necessary to have the same principal name across all brokers.* If you
   want to use a section name other than Client, set the system property
   zookeeper.sasl.client to the appropriate name (*e.g.*,
   -Dzookeeper.sasl.client=ZkClient).

And then right here
https://kafka.apache.org/documentation/#security_sasl_kerberos_brokerconfig the
suggested JAAS file is:

// Zookeeper client authentication
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_server.keytab"*
principal="kafka/kafka1.hostname@example.com
";
*};

Which in my opinion shows that every broker should have a different
principal name to connect to Zookeeper. Is that misleading, or am I missing
something?

Thanks,
Stephane


Kafka Schema Registry JAAS file

2017-02-19 Thread Stephane Maarek
Hi

I’d be great to document what the JAAS file may look like at:
http://docs.confluent.io/3.1.2/schema-registry/docs/security.html

I need to ask for principals from my IT which takes a while, so is this a
correct JAAS?

KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab=“/etc/kafka/keytabs/kafka-schema-registry.keytab”
principal=“kafka-schema-regis...@example.com";
}

Client{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab=“/etc/kafka/keytabs/myzkclient.keytab”
principal=“myzkcli...@example.com";
}

My guess is that the Client section needs to be the exact same for
schema-registry and kafka brokers because they both manipulate the same
znodes?

Regarding the KafkaClient, that’s where I’m a little bit lost. Schema
registry will authenticate to Kafka using SASL 9095, but then does it need
any ACLs or permissions? Or am I missing something?
And where do I set the serviceName in the JAAS file?

Thanks
Stephane


Log twice ProducerConfig & ConsumerConfig values

2017-02-19 Thread Simon Teles

Hello,

I'm curious to know why, when the producer/consumer are creating, the 
ProducerConfig, ConsumerConfig are logged twice ? Is that normal ?


Example :

10:52:08.963 INFO  [o.a.k.s.p.i.StreamThread||l.170] ~~ Creating 
producer client for stream thread [StreamThread-1]
10:52:08.969 INFO  [o.a.k.c.p.ProducerConfig|logAll|l.178] ~~ 
ProducerConfig values:

metric.reporters = []
metadata.max.age.ms = 30
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [kafka:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 6
interceptor.classes = null
ssl.truststore.password = null
client.id = test-stream-2-StreamThread-1-producer
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 3
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 6
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 3
key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner

linger.ms = 100

10:52:08.996 INFO  [o.a.k.c.p.ProducerConfig|logAll|l.178] ~~ 
ProducerConfig values:

metric.reporters = []
metadata.max.age.ms = 30
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [kafka:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 6
interceptor.classes = null
ssl.truststore.password = null
client.id = test-stream-2-StreamThread-1-producer
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 3
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 6
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 3
key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner

linger.ms = 100

It's the same for ConsumerConfig and for "Creating restore consumer client".

Thanks,

Simon



Frequently shrink/expand ISR on one broker

2017-02-19 Thread Jun MA
Hi team,

We are running confluent 0.9.0.1 on a cluster with 6 brokers. These days one of 
our broker(broker 1) frequently shrink the ISR and expand it immediately every 
about 20 minutes and I couldn’t find out why. Based on the log, I can kick out 
any of other brokers, not just a specific one. Here’s one log sample:

[2017-02-19 13:01:01,741] ERROR Processor got uncaught exception. 
(kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException
[2017-02-19 13:01:01,777] ERROR Processor got uncaught exception. 
(kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException
[2017-02-19 13:01:01,781] ERROR Processor got uncaught exception. 
(kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException
[2017-02-19 13:01:01,796] ERROR Processor got uncaught exception. 
(kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException
[2017-02-19 13:01:17,230] INFO Partition [khloe-updates-prod-els,0] on broker 
1: Shrinking ISR for partition [khloe-updates-prod-els,0] from 1,5,4 to 1,4 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,231] INFO Partition [jtop-migration-prod-els,37] on broker 
1: Shrinking ISR for partition [jtop-migration-prod-els,37] from 1,5,6 to 1,6 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,232] INFO Partition [hummus-asset-v2-prod-els,0] on broker 
1: Shrinking ISR for partition [hummus-asset-v2-prod-els,0] from 1,5,3 to 1,3 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,233] INFO Partition [jtop-migration-prod-els,57] on broker 
1: Shrinking ISR for partition [jtop-migration-prod-els,57] from 1,5,6 to 1,6 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,234] INFO Partition [jtop-migration-smoke-els,6] on broker 
1: Shrinking ISR for partition [jtop-migration-smoke-els,6] from 1,5,4 to 1,4 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,234] INFO Partition [jtop-migration-prod-els,12] on broker 
1: Shrinking ISR for partition [jtop-migration-prod-els,12] from 1,5,4 to 1,4 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,235] INFO Partition [__consumer_offsets,39] on broker 1: 
Shrinking ISR for partition [__consumer_offsets,39] from 1,5,4 to 1,4 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,236] INFO Partition [jtop-migration-smoke-els,51] on 
broker 1: Shrinking ISR for partition [jtop-migration-smoke-els,51] from 1,5,6 
to 1,6 (kafka.cluster.Partition)
[2017-02-19 13:01:17,237] INFO Partition [hummus-entity-merged-v2-prod-els,0] 
on broker 1: Shrinking ISR for partition [hummus-entity-merged-v2-prod-els,0] 
from 1,5,4 to 1,4 (kafka.cluster.Partition)
[2017-02-19 13:01:17,238] INFO Partition 
[resolver-notification-profiles-staging-els,1] on broker 1: Shrinking ISR for 
partition [resolver-notification-profiles-staging-els,1] from 1,5,4 to 1,4 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,239] INFO Partition [jtop-migration-prod-els,52] on broker 
1: Shrinking ISR for partition [jtop-migration-prod-els,52] from 1,5,4 to 1,4 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,240] INFO Partition [compass-home-record-prod-els,0] on 
broker 1: Shrinking ISR for partition [compass-home-record-prod-els,0] from 
1,5,4 to 1,4 (kafka.cluster.Partition)
[2017-02-19 13:01:17,241] INFO Partition [jtop-migration-smoke-els,46] on 
broker 1: Shrinking ISR for partition [jtop-migration-smoke-els,46] from 1,5,4 
to 1,4 (kafka.cluster.Partition)
[2017-02-19 13:01:17,242] INFO Partition [stolmen-entity-prod-els,2] on broker 
1: Shrinking ISR for partition [stolmen-entity-prod-els,2] from 1,5,6 to 1,6 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,243] INFO Partition [hummus-boundary-v2-prod-els,0] on 
broker 1: Shrinking ISR for partition [hummus-boundary-v2-prod-els,0] from 
1,5,3 to 1,3 (kafka.cluster.Partition)
[2017-02-19 13:01:17,244] INFO Partition [jtop-migration-smoke-els,11] on 
broker 1: Shrinking ISR for partition [jtop-migration-smoke-els,11] from 1,5,6 
to 1,6 (kafka.cluster.Partition)
[2017-02-19 13:01:17,244] INFO Partition [flintan-asset-view-smoke-els,4] on 
broker 1: Shrinking ISR for partition [flintan-asset-view-smoke-els,4] from 
1,5,4 to 1,4 (kafka.cluster.Partition)
[2017-02-19 13:01:17,245] INFO Partition [stolmen-bundle-prod-els,3] on broker 
1: Shrinking ISR for partition [stolmen-bundle-prod-els,3] from 1,5,6 to 1,6 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,246] INFO Partition [flintan-asset-view-prod-els,23] on 
broker 1: Shrinking ISR for partition [flintan-asset-view-prod-els,23] from 
1,5,6 to 1,6 (kafka.cluster.Partition)
[2017-02-19 13:01:17,247] INFO Partition 
[resolver-notifcation-requests-prod-els,0] on broker 1: Shrinking ISR for 
partition [resolver-notifcation-requests-prod-els,0] from 1,5,6 to 1,6 
(kafka.cluster.Partition)
[2017-02-19 13:01:17,248] INFO Partition [mj,0] on broker 1: Shrinking ISR for 
partition [mj,0] from 1,5,6 to 1,6 (kafka.cluster.Partition)
[2017-02-19 13:01:17,249] INFO Partition [flintan-asset-view-prod-els,17] on 
broker 1: Shrinking ISR for partition [flintan-asset-view-prod-els,17] from 
1,5,4 to 1,4 

Re: Storm kafka integration

2017-02-19 Thread Matthias J. Sax
You should ask Storm people. Kafka Spout is not provided by Kafka community.

Or maybe try out Kafka's Streams API (couldn't resist... ;) )


-Matthias

On 2/19/17 11:49 AM, pradeep s wrote:
> Hi,
> I am using Storm 1.0.2 and Kafka 0.10.1.1 and have query on Spout code to
> integrate with Kafka. As per storm docs , its mentioned to use Broker Hosts
> to register the Kafka Spout.
> http://storm.apache.org/releases/1.0.2/storm-kafka.html
> 
> In this case will the consumer offsets be stored in zookeeper. Is this the
> preferred approach .
> I have read that in latest Kafka version , consumer offsets can be
> maintained in Kafka cluster itslef. Is there any storm spout example for
> this .
> Regards
> Pradeep S
> 



signature.asc
Description: OpenPGP digital signature


Storm kafka integration

2017-02-19 Thread pradeep s
Hi,
I am using Storm 1.0.2 and Kafka 0.10.1.1 and have query on Spout code to
integrate with Kafka. As per storm docs , its mentioned to use Broker Hosts
to register the Kafka Spout.
http://storm.apache.org/releases/1.0.2/storm-kafka.html

In this case will the consumer offsets be stored in zookeeper. Is this the
preferred approach .
I have read that in latest Kafka version , consumer offsets can be
maintained in Kafka cluster itslef. Is there any storm spout example for
this .
Regards
Pradeep S