[jira] [Updated] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-07-10 Thread Ashish Surana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Surana updated KAFKA-7149:
-
Attachment: (was: AssignmentInfo.java)

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below exception at kafka-broker.
> RecordTooLargeException
> Resolution of this issue is explained at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6123) Give client MetricsReporter auto-generated client.id

2018-07-10 Thread Kevin Lu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Lu reassigned KAFKA-6123:
---

Assignee: Kevin Lu

> Give client MetricsReporter auto-generated client.id
> 
>
> Key: KAFKA-6123
> URL: https://issues.apache.org/jira/browse/KAFKA-6123
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, metrics
>Reporter: Kevin Lu
>Assignee: Kevin Lu
>Priority: Minor
>  Labels: clients, metrics, newbie++
>
> KAFKA-4756 bugfix resolved the broker's KafkaMetricsReporter missing auto 
> generated broker ids, but this was not fixed on the client side.
>  
> Metric reporters configured for clients should also be given the 
> auto-generated client id in the `configure` method. The interceptors do 
> receive the auto-generated client id.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7144) Kafka Streams doesn't properly balance partition assignment

2018-07-10 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539578#comment-16539578
 ] 

Guozhang Wang commented on KAFKA-7144:
--

Hi [~wushujames], how many threads did you configure with these two instances?

> Kafka Streams doesn't properly balance partition assignment
> ---
>
> Key: KAFKA-7144
> URL: https://issues.apache.org/jira/browse/KAFKA-7144
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: James Cheng
>Priority: Major
> Attachments: OneThenTwelve.java
>
>
> Kafka Streams doesn't always spread the tasks across all available 
> instances/threads
> I have a topology which consumes a single partition topic and goes .through() 
> a 12 partition topic. The makes 13 partitions.
>  
> I then started 2 instances of the application. I would have expected the 13 
> partitions to be split across the 2 instances roughly evenly (7 partitions on 
> one, 6 partitions on the other).
> Instead, one instance gets 12 partitions, and the other instance gets 1 
> partition.
>  
> Repro case attached. I ran it a couple times, and it was fairly repeatable.
> Setup for the repro:
> {code:java}
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic one 
> --partitions 1 --replication-factor 1 
> $ ./bin/kafka-topics.sh --zookeeper localhost --create --topic twelve 
> --partitions 12 --replication-factor 1
> $ echo foo | kafkacat -P -b 127.0.0.1 -t one
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6976) Kafka Streams instances going in to DEAD state

2018-07-10 Thread Ashish Surana (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539565#comment-16539565
 ] 

Ashish Surana commented on KAFKA-6976:
--

Created below ticket to address this issue the right way:

https://issues.apache.org/jira/browse/KAFKA-7149: Reduce assignment data size 
to improve kafka streams scalability

> Kafka Streams instances going in to DEAD state
> --
>
> Key: KAFKA-6976
> URL: https://issues.apache.org/jira/browse/KAFKA-6976
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Deepak Goyal
>Priority: Blocker
> Attachments: kafkaStreamsDeadState.log
>
>
> We are using Kafka 0.10.2.0, Kafka-Streams 1.1.0. We have Kafka Cluster of 16 
> machines, and topic that is being consumed by Kafka Streams has 256 
> partitions. We spawned 400 machines of Kakfa Streams application. We see that 
> all of the StreamThreads go in to DEAD state.
> {quote}{{[2018-05-25 05:59:29,282] INFO stream-thread 
> [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7-StreamThread-1] State transition 
> from PENDING_SHUTDOWN to DEAD 
> (org.apache.kafka.streams.processor.internals.StreamThread) [2018-05-25 
> 05:59:29,282] INFO stream-client [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7] 
> State transition from REBALANCING to ERROR 
> (org.apache.kafka.streams.KafkaStreams) [2018-05-25 05:59:29,282] WARN 
> stream-client [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7] All stream threads 
> have died. The instance will be in error state and should be closed. 
> (org.apache.kafka.streams.KafkaStreams) [2018-05-25 05:59:29,282] INFO 
> stream-thread [ksapp-19f923d7-5f9e-4137-b79f-ee20945a7dd7-StreamThread-1] 
> Shutdown complete 
> (org.apache.kafka.streams.processor.internals.StreamThread)}}
> {quote}
> Please note that when we only have 100 kafka-streams application machines, 
> things are working as expected. We see that instances are consuming messages 
> from topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-07-10 Thread Ashish Surana (JIRA)
Ashish Surana created KAFKA-7149:


 Summary: Reduce assignment data size to improve kafka streams 
scalability
 Key: KAFKA-7149
 URL: https://issues.apache.org/jira/browse/KAFKA-7149
 Project: Kafka
  Issue Type: Improvement
Reporter: Ashish Surana


We observed that when we have high number of partitions, instances or 
stream-threads, assignment-data size grows too fast and we start getting below 
exception at kafka-broker.
RecordTooLargeException
Resolution of this issue is explained at: 
https://issues.apache.org/jira/browse/KAFKA-6976

Still it limits the scalability of kafka streams as moving around 100MBs of 
assignment data for each rebalancing affects performance & reliability (timeout 
exceptions starts appearing) as well. Also this limits kafka streams scale even 
with high max.message.bytes setting as data size increases pretty quickly with 
number of partitions, instances or stream-threads.

 

Solution:

To address this issue in our cluster, we are sending the compressed 
assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
the kafka streams scalability drastically for us and we could now run it with 
more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7148) Kafka load log very slow after goes down with outOfMemoryError

2018-07-10 Thread wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wang updated KAFKA-7148:

Description: 
two questions

1、is there any idea to avoid the IOException: Map failed, or is there any limit 
in vm mem size ?

2、is it normal when kafka load one partition log file  cost 20+ secend? 

 

Detail Info:

1、Linux Version :


 kafka_2.11-0.10.1.1> cat /etc/SuSE-release
 SUSE Linux Enterprise Server 11 (x86_64)
 VERSION = 11
 PATCHLEVEL = 3

 

2、VM INFO :4C32G

 

3、java -version
 java version "1.8.0_131"
 Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
 Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

 

 

4、Start command :

java -Xmx16G -Xms16G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
-XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
-Djava.awt.headless=true 
-Xloggc:/opt/***/kafka_2.11-0.10.1.1/bin/../logs/kafkaServer-gc.log -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
-Dkafka.logs.dir=/opt/***/kafka_2.11-0.10.1.1/bin/../logs 
-Dlog4j.configuration=[file:./../config/log4j.properties|file://./config/log4j.properties]
 -cp :

 

 

5、Broker

we have 3 broker,3 zookeeper
 we have 4 topics in this kafka cluster
 __consumer_offsets 50 partition,3 replicate
 topic1 5 partition,3 replicate
 topic2 160 partition,3 replicate
 topic3 160 partition,3 replicate

Total data disk use :32G
 du -sh data/
 32G data/

 

 

6、logs

[2018-07-10 17:23:59,728] FATAL [Replica Manager on Broker 1]: Halting due to 
unrecoverable I/O error while handling produce request: 
(kafka.server.ReplicaManager)
 kafka.common.KafkaStorageException: I/O exception in append to log 
'**-13'
 at kafka.log.Log.append(Log.scala:349)
 at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
 at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
 at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
 at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
 at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:436)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: java.io.IOException: Map failed
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
 at kafka.log.AbstractIndex.(AbstractIndex.scala:61)
 at kafka.log.OffsetIndex.(OffsetIndex.scala:52)
 at kafka.log.LogSegment.(LogSegment.scala:67)
 at kafka.log.Log.roll(Log.scala:778)
 at kafka.log.Log.maybeRoll(Log.scala:744)
 at kafka.log.Log.append(Log.scala:405)
 ... 22 more
 Caused by: java.lang.OutOfMemoryError: Map failed
 at sun.nio.ch.FileChannelImpl.map0(Native Method)
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)
 ... 28 more

 

 

7、then I fllow this 
(https://issues.apache.org/jira/browse/KAFKA-6165?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22sun.nio.ch.FileChannelImpl.map%20Map%20failed%22)

add the vm.max_map_count from default value (65536)->262144

/sbin/sysctl -a |grep map 
 vm.max_map_count = 262144

cat /proc/2860/maps |wc -l
 1195

and change the kafka-run-class.sh replaced `-XX:+DisableExplicitGC` with 
`-XX:+ExplicitGCInvokesConcurrent`

 

 

8、but the "IOException: Map failed" problem still exist

then we add vm mem server to 4C64G,and change the -Xmx16G -Xms16G to -Xmx4G 
-Xms4G

 

 

9、load log slow log

[2018-07-10 17:35:33,481] INFO Completed load of log ***-37 with 2 log segments 
and log end offset 2441365 in 15678 ms (kafka.log.Log)
 [2018-07-10 17:35:33,484] WARN Found a corrupted index file due to requirement 
failed: Corrupt index found, index file 
(/opt/***/data/***-34/02451611.index) has non-zero size but the 
last offset is 2451611 which is no larger than the base offset 2451611.}. 
deleting 

[jira] [Created] (KAFKA-7148) Kafka load log very slow after goes down with outOfMemoryError

2018-07-10 Thread wang (JIRA)
wang created KAFKA-7148:
---

 Summary: Kafka load log very slow after goes down with 
outOfMemoryError
 Key: KAFKA-7148
 URL: https://issues.apache.org/jira/browse/KAFKA-7148
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.1.1
Reporter: wang


two questions

1、is there any idea to avoid the IOException: Map failed, or is there any limit 
in vm mem size ?

2、is it normal when kafka load one partition log file  cost 20+ secend? 

 

Detail Info:

1、Linux Version :
 kafka_2.11-0.10.1.1> cat /etc/SuSE-release
 SUSE Linux Enterprise Server 11 (x86_64)
 VERSION = 11
 PATCHLEVEL = 3
 
2、VM INFO :4C32G

3、java -version
 java version "1.8.0_131"
 Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
 Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

 

4、Start command :

java -Xmx16G -Xms16G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
-XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
-Djava.awt.headless=true 
-Xloggc:/opt/***/kafka_2.11-0.10.1.1/bin/../logs/kafkaServer-gc.log -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
-Dkafka.logs.dir=/opt/***/kafka_2.11-0.10.1.1/bin/../logs 
-Dlog4j.configuration=file:./../config/log4j.properties -cp :


5、Broker

we have 3 broker,3 zookeeper
 we have 4 topics in this kafka cluster
 __consumer_offsets 50 partition,3 replicate
 topic1 5 partition,3 replicate
 topic2 160 partition,3 replicate
 topic3 160 partition,3 replicate

Total data disk use :32G
 du -sh data/
 32G data/


6、logs

[2018-07-10 17:23:59,728] FATAL [Replica Manager on Broker 1]: Halting due to 
unrecoverable I/O error while handling produce request: 
(kafka.server.ReplicaManager)
kafka.common.KafkaStorageException: I/O exception in append to log 
'**-13'
 at kafka.log.Log.append(Log.scala:349)
 at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
 at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
 at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
 at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
 at 
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
 at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:436)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Map failed
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
 at kafka.log.AbstractIndex.(AbstractIndex.scala:61)
 at kafka.log.OffsetIndex.(OffsetIndex.scala:52)
 at kafka.log.LogSegment.(LogSegment.scala:67)
 at kafka.log.Log.roll(Log.scala:778)
 at kafka.log.Log.maybeRoll(Log.scala:744)
 at kafka.log.Log.append(Log.scala:405)
 ... 22 more
Caused by: java.lang.OutOfMemoryError: Map failed
 at sun.nio.ch.FileChannelImpl.map0(Native Method)
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)
 ... 28 more

 

7、then I fllow this 
(https://issues.apache.org/jira/browse/KAFKA-6165?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22sun.nio.ch.FileChannelImpl.map%20Map%20failed%22)

add the vm.max_map_count from default value (65536)->262144
 
 /sbin/sysctl -a |grep map 
 vm.max_map_count = 262144
 
 cat /proc/2860/maps |wc -l
 1195

and change the kafka-run-class.sh replaced `-XX:+DisableExplicitGC` with 
`-XX:+ExplicitGCInvokesConcurrent`


8、but the "IOException: Map failed" problem still exist
 
 then we add vm mem server to 4C64G,and change the -Xmx16G -Xms16G to -Xmx4G 
-Xms4G


9、load log slow log 
 
 [2018-07-10 17:35:33,481] INFO Completed load of log ***-37 with 2 log 
segments and log end offset 2441365 in 15678 ms (kafka.log.Log)
 [2018-07-10 17:35:33,484] WARN Found a corrupted index file due to requirement 
failed: Corrupt index found, index file 

[jira] [Updated] (KAFKA-7143) Cannot use KafkaConsumer with Kotlin coroutines due to various issues

2018-07-10 Thread Raman Gupta (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raman Gupta updated KAFKA-7143:
---
Description: 
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's are usually easily be adapted to this style with 
a simple wrapper). With coroutines, methods with callbacks are suspended, and 
resumed once the call is complete. With this approach, while access to the 
KafkaConsumer is done in a thread-safe way, it does NOT necessarily happen from 
a single thread -- a different underlying thread may actually execute the code 
after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.

There is a workaround for the above: run the consumer in a coroutine with a 
single-thread context, which isn't ideal because it dedicates a thread to the 
consumer. However, further problems await -- the `commitAsync` method also 
cannot be used with coroutines because the callback is never executed and 
therefore the coroutine is never resumed past the suspension point. The 
callback seems to only be executed on future calls to poll, which will never 
happen because of the suspension, so we have a deadlock. I guess the idea 
behind this Kafka consumer API design is that consuming new messages may 
continue, even though commits of previous offsets (which happened an 
arbitrarily long amount of time in the past) have not necessarily been 
processed. However, with a coroutine based API, the commitAsync can be 
sequential before the next poll like commitSync, but happen asynchronously 
without tying up a client application thread.

  was:
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's are usually easily be adapted to this style with 
a simple wrapper). With coroutines, methods with callbacks are suspended, and 
resumed once the call is complete. With this approach, while access to the 
KafkaConsumer is done in a thread-safe way, it does NOT necessarily happen from 
a single thread -- a different underlying thread may actually execute the code 
after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.

There is a workaround for the above: run the consumer in a coroutine with a 
single-thread context, which isn't ideal because it dedicates a thread to the 
consumer. However, further problems await -- the `commitAsync` method also 
cannot be used with coroutines because the callback is never executed and 
therefore the coroutine is never resumed past the suspension point. The 
callback seems to only be executed on future calls to poll, which will never 
happen because of the suspension, so we have a deadlock. I guess the idea 
behind this API design is that consuming new messages may continue, even though 
commits of previous offsets have not necessarily been processed. However, with 
a coroutine based 

[jira] [Updated] (KAFKA-7143) Cannot use KafkaConsumer with Kotlin coroutines due to various issues

2018-07-10 Thread Raman Gupta (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raman Gupta updated KAFKA-7143:
---
Description: 
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's are usually easily be adapted to this style with 
a simple wrapper). With coroutines, methods with callbacks are suspended, and 
resumed once the call is complete. With this approach, while access to the 
KafkaConsumer is done in a thread-safe way, it does NOT necessarily happen from 
a single thread -- a different underlying thread may actually execute the code 
after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.

There is a workaround for the above: run the consumer in a coroutine with a 
single-thread context, which isn't ideal because it dedicates a thread to the 
consumer. However, further problems await -- the `commitAsync` method also 
cannot be used with coroutines because the callback is never executed and 
therefore the coroutine is never resumed past the suspension point. The 
callback seems to only be executed on future calls to poll, which will never 
happen because of the suspension, so we have a deadlock. I guess the idea 
behind this API design is that consuming new messages may continue, even though 
commits of previous offsets have not necessarily been processed. However, with 
a coroutine based API, the commitAsync can be sequential before the next poll 
like commitSync, but happen asynchronously without tying up a client 
application thread.

  was:
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's are usually easily be adapted to this style with 
a simple wrapper). With coroutines, methods with callbacks are suspended, and 
resumed once the call is complete. With this approach, while access to the 
KafkaConsumer is done in a thread-safe way, it does NOT happen from a single 
thread -- a different underlying thread may actually execute the code after the 
suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.

There is a workaround for the above: run the consumer in a coroutine with a 
single-thread context, which isn't ideal because it dedicates a thread to the 
consumer. However, further problems await -- the `commitAsync` method also 
cannot be used with coroutines because the callback is never executed and 
therefore the coroutine is never resumed past the suspension point. The 
callback seems to only be executed on future calls to poll, which will never 
happen because of the suspension, so we have a deadlock. I guess the idea 
behind this API design is that consuming new messages may continue, even though 
commits of previous offsets have not necessarily been processed. However, with 
a coroutine based API, the commitAsync can be sequential before the next poll 
like commitSync, but happen 

[jira] [Updated] (KAFKA-7143) Cannot use KafkaConsumer with Kotlin coroutines due to various issues

2018-07-10 Thread Raman Gupta (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raman Gupta updated KAFKA-7143:
---
Description: 
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's are usually easily be adapted to this style with 
a simple wrapper). With coroutines, methods with callbacks are suspended, and 
resumed once the call is complete. With this approach, while access to the 
KafkaConsumer is done in a thread-safe way, it does NOT happen from a single 
thread -- a different underlying thread may actually execute the code after the 
suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.

There is a workaround for the above: run the consumer in a coroutine with a 
single-thread context, which isn't ideal because it dedicates a thread to the 
consumer. However, further problems await -- the `commitAsync` method also 
cannot be used with coroutines because the callback is never executed and 
therefore the coroutine is never resumed past the suspension point. The 
callback seems to only be executed on future calls to poll, which will never 
happen because of the suspension, so we have a deadlock. I guess the idea 
behind this API design is that consuming new messages may continue, even though 
commits of previous offsets have not necessarily been processed. However, with 
a coroutine based API, the commitAsync can be sequential before the next poll 
like commitSync, but happen asynchronously without tying up a client 
application thread.

  was:
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's such as Kafka's can easily be adapted to this). 
With coroutines, methods with callbacks are suspended, and resumed once the 
call is complete. With this approach, while access to the KafkaConsumer is done 
in a thread-safe way, it does NOT happen from a single thread -- a different 
underlying thread may actually execute the code after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.

There is a workaround for the above: run the consumer in a coroutine with a 
single-thread context, which isn't ideal because it dedicates a thread to the 
consumer. However, further problems await -- the `commitAsync` method also 
cannot be used with coroutines because the callback is never executed and 
therefore the coroutine is never resumed past the suspension point. The 
callback seems to only be executed on future calls to poll, which will never 
happen because of the suspension, so we have a deadlock. I guess the idea 
behind this API design is that consuming new messages may continue, even though 
commits of previous offsets have not necessarily been processed. However, with 
a coroutine based API, the commitAsync can be sequential before the next poll 
like commitSync, but happen asynchronously without tying up a 

[jira] [Updated] (KAFKA-7143) Cannot use KafkaConsumer with Kotlin coroutines due to various issues

2018-07-10 Thread Raman Gupta (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raman Gupta updated KAFKA-7143:
---
Description: 
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's such as Kafka's can easily be adapted to this). 
With coroutines, methods with callbacks are suspended, and resumed once the 
call is complete. With this approach, while access to the KafkaConsumer is done 
in a thread-safe way, it does NOT happen from a single thread -- a different 
underlying thread may actually execute the code after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.

There is a workaround for the above: run the consumer in a coroutine with a 
single-thread context, which isn't ideal because it dedicates a thread to the 
consumer. However, further problems await -- the `commitAsync` method also 
cannot be used with coroutines because the callback is never executed and 
therefore the coroutine is never resumed past the suspension point. The 
callback seems to only be executed on future calls to poll, which will never 
happen because of the suspension, so we have a deadlock. I guess the idea 
behind this API design is that consuming new messages may continue, even though 
commits of previous offsets have not necessarily been processed. However, with 
a coroutine based API, the commitAsync can be sequential before the next poll 
like commitSync, but happen asynchronously without tying up a client 
application thread.

  was:
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
[coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
supports a style of async programming that avoids the need for callbacks (and 
existing callback-based API's such as Kafka's can easily be adapted to this). 
With coroutines, methods with callbacks are suspended, and resumed once the 
call is complete. With this approach, while access to the KafkaConsumer is done 
in a thread-safe way, it does NOT happen from a single thread -- a different 
underlying thread may actually execute the code after the suspension point.

However, the KafkaConsumer includes additional checks to verify not only the 
thread safety of the client, but that the *same thread* is being used -- if the 
same thread (by id) is not being used the consumer throws an exception like:

{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" 
java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
multi-threaded access
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}

I understand this check is present to protect people from themselves, but I'd 
like the ability to disable this check so that this code can be used 
effectively by libraries such as Kotlin coroutines.


> Cannot use KafkaConsumer with Kotlin coroutines due to various issues
> -
>
> Key: KAFKA-7143
> URL: https://issues.apache.org/jira/browse/KAFKA-7143
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 1.1.0
>Reporter: Raman Gupta
>Priority: Major
>
> I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
> [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
> supports a style of async programming that avoids the need for callbacks (and 
> existing callback-based API's such as Kafka's can easily be adapted to this). 
> With coroutines, methods with callbacks are suspended, and resumed once the 
> call is 

[jira] [Updated] (KAFKA-7143) Cannot use KafkaConsumer with Kotlin coroutines due to various issues

2018-07-10 Thread Raman Gupta (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raman Gupta updated KAFKA-7143:
---
Summary: Cannot use KafkaConsumer with Kotlin coroutines due to various 
issues  (was: Cannot use KafkaConsumer with Kotlin coroutines due to Thread id 
check)

> Cannot use KafkaConsumer with Kotlin coroutines due to various issues
> -
>
> Key: KAFKA-7143
> URL: https://issues.apache.org/jira/browse/KAFKA-7143
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 1.1.0
>Reporter: Raman Gupta
>Priority: Major
>
> I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin 
> [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which 
> supports a style of async programming that avoids the need for callbacks (and 
> existing callback-based API's such as Kafka's can easily be adapted to this). 
> With coroutines, methods with callbacks are suspended, and resumed once the 
> call is complete. With this approach, while access to the KafkaConsumer is 
> done in a thread-safe way, it does NOT happen from a single thread -- a 
> different underlying thread may actually execute the code after the 
> suspension point.
> However, the KafkaConsumer includes additional checks to verify not only the 
> thread safety of the client, but that the *same thread* is being used -- if 
> the same thread (by id) is not being used the consumer throws an exception 
> like:
> {code}
> Exception in thread "ForkJoinPool.commonPool-worker-25" 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
> {code}
> I understand this check is present to protect people from themselves, but I'd 
> like the ability to disable this check so that this code can be used 
> effectively by libraries such as Kotlin coroutines.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3438) Rack Aware Replica Reassignment should warn of overloaded brokers

2018-07-10 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-3438:
---
Fix Version/s: 2.1.0

> Rack Aware Replica Reassignment should warn of overloaded brokers
> -
>
> Key: KAFKA-3438
> URL: https://issues.apache.org/jira/browse/KAFKA-3438
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Ben Stopford
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> We've changed the replica reassignment code to be rack aware.
> One problem that might catch users out would be that they rebalance the 
> cluster using kafka-reassign-partitions.sh but their rack configuration means 
> that some high proportion of replicas are pushed onto a single, or small 
> number of, brokers. 
> This should be an easy problem to avoid, by changing the rack assignment 
> information, but we should probably warn users if they are going to create 
> something that is unbalanced. 
> So imagine I have a Kafka cluster of 12 nodes spread over two racks with rack 
> awareness enabled. If I add a 13th machine, on a new rack, and run the 
> rebalance tool, that new machine will get ~6x as many replicas as the least 
> loaded broker. 
> Suggest a warning  be added to the tool output when --generate is called. 
> "The most loaded broker has 2.3x as many replicas as the the least loaded 
> broker. This is likely due to an uneven distribution of brokers across racks. 
> You're advised to alter the rack config so there are approximately the same 
> number of brokers per rack" and displays the individual rack→#brokers and 
> broker→#replicas data for the proposed move.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3999) Consumer bytes-fetched metric uses decompressed message size (KIP-264)

2018-07-10 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-3999:
---
Labels: kip  (was: )

> Consumer bytes-fetched metric uses decompressed message size (KIP-264)
> --
>
> Key: KAFKA-3999
> URL: https://issues.apache.org/jira/browse/KAFKA-3999
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1, 0.10.0.0
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Minor
>  Labels: kip
> Fix For: 2.1.0
>
>
> It looks like the computation for the bytes-fetched metrics uses the size of 
> the decompressed message set. I would have expected it to be based off of the 
> raw size of the fetch responses. Perhaps it would be helpful to expose both 
> the raw and decompressed fetch sizes? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-07-10 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-7037:
---
Fix Version/s: 2.1.0

> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' from the topic name  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-10 Thread James Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539373#comment-16539373
 ] 

James Cheng commented on KAFKA-7141:


I think it's a little odd that kafka-consumer-groups doesn't show partition 
assignment at all, when there are no offsets.

 

Currently, if there are 2 partitions (partitions 1 and 2)
 * A) Active consumer, no committed offsets on either of them means that you 
see nothing. No group assignment, no partitions.
 * B) Active consumer, committed offsets on 1, no committed offsets on 2, means 
that you will see rows for both of them, but the CURRENT-OFFSET field for 
partition 2 will have a hyphen in it.
 * C) Active consumer, Committed offsets on both 1 and 2, means you will see 
all the data
 * D) No active consumer, committed offsets on both 1 and 2, means you will see 
the rows, but CONSUMER-ID/HOST/CLIENT-ID will have hyphens.

This Jira is talking about "A".
I would have expected that "A" would display similar to "B". That you would see 
partition assignments, but "-" wherever there are missing offsets.

> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#33}_akka.kafka.consumer{_{color}
> {color:#33}  _kafka-clients{_{color}
> {color:#33}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#33}   _enable.auto.commit = false_{color}
> }
> {color:#33} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID{color}_
>  _{color:#205081}STREAM-TEST 0 0 0 0 
> consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_
> {color:#33}But the command to describe my akka stream consumer gives me 
> empty output:{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> myakkastreamkafka-1*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID_{color}
>  
> {color:#33}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-10 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian resolved KAFKA-7141.

Resolution: Not A Problem

> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#33}_akka.kafka.consumer{_{color}
> {color:#33}  _kafka-clients{_{color}
> {color:#33}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#33}   _enable.auto.commit = false_{color}
> }
> {color:#33} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID{color}_
>  _{color:#205081}STREAM-TEST 0 0 0 0 
> consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_
> {color:#33}But the command to describe my akka stream consumer gives me 
> empty output:{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> myakkastreamkafka-1*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID_{color}
>  
> {color:#33}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-10 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539282#comment-16539282
 ] 

Vahid Hashemian commented on KAFKA-7141:


[~kioria] thanks for the detailed info. What you're seeing makes sense. While 
there is no offset commit within a group there is nothing to register (an 
offset commit indicates successful consumption). Even with disabling auto 
offset commit, in normal scenarios offsets should be committed regularly when 
the processing of consumed records is complete.

I don't think there is anything to fix here. I'll close the Jira for now. If 
you still see an issue please reopen and provide additional info about it. 
Thanks!

> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#33}_akka.kafka.consumer{_{color}
> {color:#33}  _kafka-clients{_{color}
> {color:#33}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#33}   _enable.auto.commit = false_{color}
> }
> {color:#33} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID{color}_
>  _{color:#205081}STREAM-TEST 0 0 0 0 
> consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_
> {color:#33}But the command to describe my akka stream consumer gives me 
> empty output:{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> myakkastreamkafka-1*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID_{color}
>  
> {color:#33}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7147) ReassignPartitionsCommand should be able to connect to broker over SSL

2018-07-10 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7147:

Description: 
Currently both ReassignPartitionsCommand and LogDirsCommand instantiates 
AdminClient using bootstrap.servers and client.id provided by the user. Since 
it does not provide other ssl-related properties, these tools will not be able 
to talk to broker over SSL.

In order to solve this problem, these tools should allow users to provide 
property file containing configs to be passed to AdminClient.

  was:
Currently both ReassignPartitionsCommand and LogDirsCommand instantiates 
AdminClient using bootstrap.servers and client.id provided by the user. Since 
it does not provide other ssl-related properties, these tools will not be able 
to talk to broker over SSL.

 

In order to solve this problem, these tools should allow users to provide 
property file containing configs to be passed to AdminClient.

 

 


> ReassignPartitionsCommand should be able to connect to broker over SSL
> --
>
> Key: KAFKA-7147
> URL: https://issues.apache.org/jira/browse/KAFKA-7147
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently both ReassignPartitionsCommand and LogDirsCommand instantiates 
> AdminClient using bootstrap.servers and client.id provided by the user. Since 
> it does not provide other ssl-related properties, these tools will not be 
> able to talk to broker over SSL.
> In order to solve this problem, these tools should allow users to provide 
> property file containing configs to be passed to AdminClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7121) Intermittently, Connectors fail to assign tasks and keep retrying every second forever.

2018-07-10 Thread Gwen Shapira (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539264#comment-16539264
 ] 

Gwen Shapira commented on KAFKA-7121:
-

Oh, sorry [~yuzhih...@gmail.com], I forgot to update:
We used 1.1.0 release.
We resolved the issue by setting advertised.host for the connect workers. The 
real issue was that connect workers couldn't talk to the HTTP leader.

There are few layers of problems here:
1. When advertised host isn't set, workers end up picking the wrong IP to 
advertise.
2. When workers can't talk to the leader the error is completely misleading (we 
assume that the only reason you can't find the leader is a rebalance, but this 
is a distributed system, there are 500 reasons why 2 nodes can't talk to each 
other).
3. We keep retrying forever in this scenario (and logging 10 times per second). 
I'm not sure this is the right thing to do in this scenario.

> Intermittently, Connectors fail to assign tasks and keep retrying every 
> second forever.
> ---
>
> Key: KAFKA-7121
> URL: https://issues.apache.org/jira/browse/KAFKA-7121
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Assignee: Konstantine Karantasis
>Priority: Major
>
> We started a connector, and even though it is in RUNNING status, tasks are 
> not getting assigned:
> {"name":"prod-xxx-v2","connector":{"state":"RUNNING","worker_id":"0.0.0.0:8083"},"tasks":[],"type":"sink"}
> Other connectors are running without issues.
> Attempt to restart the connector returned 409 status.
> Logs show the following messages, keep repeating for hours:
> [2018-06-29 20:23:19,288] ERROR Task reconfiguration for prod-xxx-v2 failed 
> unexpectedly, this connector will not be properly reconfigured unless 
> manually triggered. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:956)
> [2018-06-29 20:23:19,289] INFO 10.200.149.201 - - [29/Jun/2018:20:23:19 
> +] "POST /connectors/prod-xxx-v2/tasks?forward=false HTTP/1.1" 409 113 0 
> (org.apache.kafka.connect.runtime.rest.RestServer:60)
> [2018-06-29 20:23:19,289] INFO 10.200.149.201 - - [29/Jun/2018:20:23:19 
> +] "POST /connectors/prod-xxx-v2/tasks?forward=true HTTP/1.1" 409 113 1 
> (org.apache.kafka.connect.runtime.rest.RestServer:60)
> [2018-06-29 20:23:19,289] INFO 10.200.149.201 - - [29/Jun/2018:20:23:19 
> +] "POST /connectors/prod-xxx-v2/tasks HTTP/1.1" 409 113 1 
> (org.apache.kafka.connect.runtime.rest.RestServer:60)
> [2018-06-29 20:23:19,289] ERROR Request to leader to reconfigure connector 
> tasks failed 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1018)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Cannot 
> complete request because of a conflicting operation (e.g. worker rebalance)
>  at 
> org.apache.kafka.connect.runtime.rest.RestServer.httpRequest(RestServer.java:229)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$18.run(DistributedHerder.java:1015)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7147) ReassignPartitionsCommand should be able to connect to broker over SSL

2018-07-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539258#comment-16539258
 ] 

ASF GitHub Bot commented on KAFKA-7147:
---

lindong28 opened a new pull request #5355: KAFKA-7147; 
ReassignPartitionsCommand should be able to connect to broker over SSL
URL: https://github.com/apache/kafka/pull/5355
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReassignPartitionsCommand should be able to connect to broker over SSL
> --
>
> Key: KAFKA-7147
> URL: https://issues.apache.org/jira/browse/KAFKA-7147
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently both ReassignPartitionsCommand and LogDirsCommand instantiates 
> AdminClient using bootstrap.servers and client.id provided by the user. Since 
> it does not provide other ssl-related properties, these tools will not be 
> able to talk to broker over SSL.
>  
> In order to solve this problem, these tools should allow users to provide 
> property file containing configs to be passed to AdminClient.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7147) ReassignPartitionsCommand should be able to connect to broker over SSL

2018-07-10 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7147:
---

 Summary: ReassignPartitionsCommand should be able to connect to 
broker over SSL
 Key: KAFKA-7147
 URL: https://issues.apache.org/jira/browse/KAFKA-7147
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin


Currently both ReassignPartitionsCommand and LogDirsCommand instantiates 
AdminClient using bootstrap.servers and client.id provided by the user. Since 
it does not provide other ssl-related properties, these tools will not be able 
to talk to broker over SSL.

 

In order to solve this problem, these tools should allow users to provide 
property file containing configs to be passed to AdminClient.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7146) Grouping consumer requests per consumer coordinator in admin client in describeConsumerGroups

2018-07-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539129#comment-16539129
 ] 

ASF GitHub Bot commented on KAFKA-7146:
---

shunge opened a new pull request #5353: KAFKA-7146: Grouping consumer requests 
per consumer coordinator in admin client in describeConsumerGroups
URL: https://github.com/apache/kafka/pull/5353
 
 
   This is a subtask from KAFKA-6788 
(https://issues.apache.org/jira/browse/KAFKA-6788)
   
   Basically the idea is to use thread-safe collections like 
ConcurrentLinkedQueue and ConcurrentHashMap to save to result of each 
"findCoordinator" response. In the map, we will have the coordinator node Id as 
the key, and the value will be the collection of all the belonging groupIds. 
   
   When all the groupId processed (either successful or completed 
exceptionally), then we will iterate through the map and group the requests by 
coordinators.
   
   This PR is mainly for discussion, there are a couple concerns:
   
   1. Code logic: Is there any thing wrong with the logic of this solution?
   2. Concurrency issues: can these collections guarantee the thread-safety?
   3. Memory issue: using too many collections too store information, should 
cut down the memory usage.
   4. Efficiency: is this more efficient than the implementation before?
   5. Naming issues (I already noticed a typo).
   6. More unit tests.
   
   I am open to all kind of suggestions and feedbacks, and will make several 
other commits after I receive opinion. 
   
   P.S. Is a KIP needed?
   
   @guozhangwang @cmccabe Thank you!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Grouping consumer requests per consumer coordinator in admin client in 
> describeConsumerGroups
> -
>
> Key: KAFKA-7146
> URL: https://issues.apache.org/jira/browse/KAFKA-7146
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yishun Guan
>Assignee: Yishun Guan
>Priority: Critical
>
> Subtask of KAFKA-6788. Group consumer requests for describeConsumerGroups().
>  
> According to KAFKA-6788:
>  
> "In KafkaAdminClient, for some requests like describeGroup and deleteGroup, 
> we will first try to get the coordinator for each requested group id, and 
> then send the corresponding request for that group id. However, different 
> group ids could be hosted on the same coordinator, and these requests do 
> support multi group ids be sent within the same request. So we can consider 
> optimize it by grouping the requests per coordinator destination."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7146) Grouping consumer requests per consumer coordinator in admin client in describeConsumerGroups

2018-07-10 Thread Yishun Guan (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yishun Guan updated KAFKA-7146:
---
Description: 
Subtask of KAFKA-6788. Group consumer requests for describeConsumerGroups().

 

According to KAFKA-6788:

 

"In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
will first try to get the coordinator for each requested group id, and then 
send the corresponding request for that group id. However, different group ids 
could be hosted on the same coordinator, and these requests do support multi 
group ids be sent within the same request. So we can consider optimize it by 
grouping the requests per coordinator destination."

  was:
Subtask of KAFKA-6788. Group consumer requests for describeConsumerGroups().

 

According to KAFKA-6788:

 

"
In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
will first try to get the coordinator for each requested group id, and then 
send the corresponding request for that group id. However, different group ids 
could be hosted on the same coordinator, and these requests do support multi 
group ids be sent within the same request. So we can consider optimize it by 
grouping the requests per coordinator destination.

"


> Grouping consumer requests per consumer coordinator in admin client in 
> describeConsumerGroups
> -
>
> Key: KAFKA-7146
> URL: https://issues.apache.org/jira/browse/KAFKA-7146
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yishun Guan
>Priority: Critical
>
> Subtask of KAFKA-6788. Group consumer requests for describeConsumerGroups().
>  
> According to KAFKA-6788:
>  
> "In KafkaAdminClient, for some requests like describeGroup and deleteGroup, 
> we will first try to get the coordinator for each requested group id, and 
> then send the corresponding request for that group id. However, different 
> group ids could be hosted on the same coordinator, and these requests do 
> support multi group ids be sent within the same request. So we can consider 
> optimize it by grouping the requests per coordinator destination."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7146) Grouping consumer requests per consumer coordinator in admin client in describeConsumerGroups

2018-07-10 Thread Yishun Guan (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yishun Guan reassigned KAFKA-7146:
--

Assignee: Yishun Guan

> Grouping consumer requests per consumer coordinator in admin client in 
> describeConsumerGroups
> -
>
> Key: KAFKA-7146
> URL: https://issues.apache.org/jira/browse/KAFKA-7146
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yishun Guan
>Assignee: Yishun Guan
>Priority: Critical
>
> Subtask of KAFKA-6788. Group consumer requests for describeConsumerGroups().
>  
> According to KAFKA-6788:
>  
> "In KafkaAdminClient, for some requests like describeGroup and deleteGroup, 
> we will first try to get the coordinator for each requested group id, and 
> then send the corresponding request for that group id. However, different 
> group ids could be hosted on the same coordinator, and these requests do 
> support multi group ids be sent within the same request. So we can consider 
> optimize it by grouping the requests per coordinator destination."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7146) Grouping consumer requests per consumer coordinator in admin client in describeConsumerGroups

2018-07-10 Thread Yishun Guan (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yishun Guan updated KAFKA-7146:
---
Description: 
Subtask of KAFKA-6788. Group consumer requests for describeConsumerGroups().

 

According to KAFKA-6788:

 

"
In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
will first try to get the coordinator for each requested group id, and then 
send the corresponding request for that group id. However, different group ids 
could be hosted on the same coordinator, and these requests do support multi 
group ids be sent within the same request. So we can consider optimize it by 
grouping the requests per coordinator destination.

"

  was:Subtask of KAFKA-6788. Group consumer requests for 
describeConsumerGroups().


> Grouping consumer requests per consumer coordinator in admin client in 
> describeConsumerGroups
> -
>
> Key: KAFKA-7146
> URL: https://issues.apache.org/jira/browse/KAFKA-7146
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yishun Guan
>Priority: Critical
>
> Subtask of KAFKA-6788. Group consumer requests for describeConsumerGroups().
>  
> According to KAFKA-6788:
>  
> "
> In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
> will first try to get the coordinator for each requested group id, and then 
> send the corresponding request for that group id. However, different group 
> ids could be hosted on the same coordinator, and these requests do support 
> multi group ids be sent within the same request. So we can consider optimize 
> it by grouping the requests per coordinator destination.
> "



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7146) Grouping consumer requests per consumer coordinator in admin client in describeConsumerGroups

2018-07-10 Thread Yishun Guan (JIRA)
Yishun Guan created KAFKA-7146:
--

 Summary: Grouping consumer requests per consumer coordinator in 
admin client in describeConsumerGroups
 Key: KAFKA-7146
 URL: https://issues.apache.org/jira/browse/KAFKA-7146
 Project: Kafka
  Issue Type: Sub-task
Reporter: Yishun Guan


Subtask of KAFKA-6788. Group consumer requests for describeConsumerGroups().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5638) Inconsistency in consumer group related ACLs

2018-07-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539086#comment-16539086
 ] 

ASF GitHub Bot commented on KAFKA-5638:
---

vahidhashemian opened a new pull request #5352: KAFKA-5638: Improve the 
Required ACL of ListGroups API (KIP-231)
URL: https://github.com/apache/kafka/pull/5352
 
 
   
[KIP-231](https://cwiki.apache.org/confluence/display/KAFKA/KIP-231%3A+Improve+the+Required+ACL+of+ListGroups+API)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Inconsistency in consumer group related ACLs
> 
>
> Key: KAFKA-5638
> URL: https://issues.apache.org/jira/browse/KAFKA-5638
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Vahid Hashemian
>Assignee: Vahid Hashemian
>Priority: Minor
>  Labels: kip
> Fix For: 2.1.0
>
>
> Users can see all groups in the cluster (using consumer group’s {{--list}} 
> option) provided that they have {{Describe}} access to the cluster. It would 
> make more sense to modify that experience and limit what is listed in the 
> output to only those groups they have {{Describe}} access to. The reason is, 
> almost everything else is accessible by a user only if the access is 
> specifically granted (through ACL {{--add}}); and this scenario should not be 
> an exception. The potential change would be updating the minimum required 
> permission of {{ListGroup}} from {{Describe (Cluster)}} to {{Describe 
> (Group)}}.
> We can also look at this issue from a different angle: A user with {{Read}} 
> access to a group can describe the group, but the same user would not see 
> anything when listing groups (assuming there is no {{Describe}} access to the 
> cluster). It makes more sense for this user to be able to list all groups 
> s/he can already describe.
> It would be great to know if any user is relying on the existing behavior 
> (listing all consumer groups using a {{Describe (Cluster)}} ACL).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore

2018-07-10 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-7080.
-
Resolution: Fixed

Fixed in https://github.com/apache/kafka/pull/5257

> WindowStoreBuilder incorrectly initializes CachingWindowStore
> -
>
> Key: KAFKA-7080
> URL: https://issues.apache.org/jira/browse/KAFKA-7080
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0, 1.0.1, 1.1.0, 2.0.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> When caching is enabled on the WindowStoreBuilder, it creates a 
> CachingWindowStore. However, it incorrectly passes storeSupplier.segments() 
> (the number of segments) to the segmentInterval argument.
>  
> The impact is low, since any valid number of segments is also a valid segment 
> size, but it likely results in much smaller segments than intended. For 
> example, the segments may be sized 3ms instead of 60,000ms.
>  
> Ideally the WindowBytesStoreSupplier interface would allow suppliers to 
> advertise their segment size instead of segment count. I plan to create a KIP 
> to propose this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-10 Thread Bohdana Panchenko (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538861#comment-16538861
 ] 

Bohdana Panchenko commented on KAFKA-7141:
--

[~vahid], i have updates from my side. 

First:  i checked setting {color:#d04437}enable.auto.commit = true. {color}

{color:#d04437}{color:#33}After that change, +even with the empty topic 
partition, I got the desired output for the describe command.+                  
   I{color}{color}{color:#d04437}{color:#33}s that so by 
design?{color}{color}

*$* *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
_Note: This will not show information about old Zookeeper-based consumers._
_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_
_STREAM-TEST 0 0 0 0 consumer-1-7f68330a-9c2f-4694-9978-0378368e6000 
/172.19.0.8 consumer-1_

 

Second _:_ That made me thinking I am doing something wrong while using 
consumer's commit API for manual offset management.

I set the  {color:#d04437}enable.auto.commit = false.{color}

I have double-checked and verified that i was not committing __ offsets 
properly. I fixed that and the result is:
+before manually committing offsets - no output+                                
                                                                                
 Is that by design?

*$ kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
_Note: This will not show information about old Zookeeper-based consumers._
_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_

+after correctly manually committing offsets - desired output+ __ 

*$ kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
_Note: This will not show information about old Zookeeper-based consumers._
_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_
_STREAM-TEST 0 5 5 0 consumer-1-868599a6-272a-4929-9b28-b67de153fab4 
/172.17.0.1 consumer-1_

 

+So to summarize: is it so, that for the group to be properly described, it is 
required either: using automatic offset management or wait for the first offset 
commit from the consumer for that group?+

+Thank you in advance.+ __ 

> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#33}_akka.kafka.consumer{_{color}
> {color:#33}  _kafka-clients{_{color}
> {color:#33}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#33}   _enable.auto.commit = false_{color}
> }
> {color:#33} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID{color}_
>  _{color:#205081}STREAM-TEST 0 0 0 0 
> consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_
> {color:#33}But the command to describe my akka stream consumer gives me 
> empty output:{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> myakkastreamkafka-1*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID_{color}
>  
> {color:#33}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-10 Thread Bohdana Panchenko (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538861#comment-16538861
 ] 

Bohdana Panchenko edited comment on KAFKA-7141 at 7/10/18 4:06 PM:
---

[~vahid], i have updates from my side. 

First:  i checked setting {color:#d04437}enable.auto.commit = true. {color}

{color:#33}{color:#d04437}After that change, +even with the empty topic 
partition, I got the desired output for the describe command.+                  
   I{color}{color}s that so by design?
 
 *$* *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 
--group myakkastreamkafka-1*
 _Note: This will not show information about old Zookeeper-based consumers._
 _TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_
 _STREAM-TEST 0 0 0 0 consumer-1-7f68330a-9c2f-4694-9978-0378368e6000 
/172.19.0.8 consumer-1_
 
  
 
 Second _:_ That made me thinking I am doing something wrong while using 
consumer's commit API for manual offset management.
 
 I set the  enable.auto.commit = false.

I have double-checked and verified that i was not committing __ offsets 
properly. I fixed that and the result is:
 +before manually committing offsets - no output+                               
                                                                                
  Is that by design?

*$ kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
 _Note: This will not show information about old Zookeeper-based consumers._
 _TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_

+after correctly manually committing offsets - desired output+ __ 

*$ kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
 _Note: This will not show information about old Zookeeper-based consumers._
 _TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_
 _STREAM-TEST 0 5 5 0 consumer-1-868599a6-272a-4929-9b28-b67de153fab4 
/172.17.0.1 consumer-1_

 

+So to summarize: is it so, that for the group to be properly described, it is 
required either: using automatic offset management or wait for the first offset 
commit from the consumer for that group?+

+Thank you in advance.+ __ 


was (Author: kioria):
[~vahid], i have updates from my side. 

First:  i checked setting {color:#d04437}enable.auto.commit = true. {color}

{color:#d04437}{color:#33}After that change, +even with the empty topic 
partition, I got the desired output for the describe command.+                  
   I{color}{color}{color:#d04437}{color:#33}s that so by 
design?{color}{color}

*$* *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
_Note: This will not show information about old Zookeeper-based consumers._
_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_
_STREAM-TEST 0 0 0 0 consumer-1-7f68330a-9c2f-4694-9978-0378368e6000 
/172.19.0.8 consumer-1_

 

Second _:_ That made me thinking I am doing something wrong while using 
consumer's commit API for manual offset management.

I set the  {color:#d04437}enable.auto.commit = false.{color}

I have double-checked and verified that i was not committing __ offsets 
properly. I fixed that and the result is:
+before manually committing offsets - no output+                                
                                                                                
 Is that by design?

*$ kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
_Note: This will not show information about old Zookeeper-based consumers._
_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_

+after correctly manually committing offsets - desired output+ __ 

*$ kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
myakkastreamkafka-1*
_Note: This will not show information about old Zookeeper-based consumers._
_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID_
_STREAM-TEST 0 5 5 0 consumer-1-868599a6-272a-4929-9b28-b67de153fab4 
/172.17.0.1 consumer-1_

 

+So to summarize: is it so, that for the group to be properly described, it is 
required either: using automatic offset management or wait for the first offset 
commit from the consumer for that group?+

+Thank you in advance.+ __ 

> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as 

[jira] [Commented] (KAFKA-1774) REPL and Shell Client for Admin Message RQ/RP

2018-07-10 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538687#comment-16538687
 ] 

Viktor Somogyi commented on KAFKA-1774:
---

[~abiletskyi] do you still work on this? We have a spike implementation which 
uses the new AdminClient. We'd like to publish in the foreseeable future and if 
you don't mind I'd like to pick up this task, integrate your work and continue 
with this. Do you mind me picking this up?

> REPL and Shell Client for Admin Message RQ/RP
> -
>
> Key: KAFKA-1774
> URL: https://issues.apache.org/jira/browse/KAFKA-1774
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Joe Stein
>Assignee: Andrii Biletskyi
>Priority: Major
>
> We should have a REPL we can work in and execute the commands with the 
> arguments. With this we can do:
> ./kafka.sh --shell 
> kafka>attach cluster -b localhost:9092;
> kafka>describe topic sampleTopicNameForExample;
> the command line version can work like it does now so folks don't have to 
> re-write all of their tooling.
> kafka.sh --topics --everything the same like kafka-topics.sh is 
> kafka.sh --reassign --everything the same like kafka-reassign-partitions.sh 
> is 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5846) Use singleton NoOpConsumerRebalanceListener in subscribe() call where listener is not specified

2018-07-10 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191405#comment-16191405
 ] 

Ted Yu edited comment on KAFKA-5846 at 7/10/18 12:16 PM:
-

+1 from me


was (Author: yuzhih...@gmail.com):
+1

> Use singleton NoOpConsumerRebalanceListener in subscribe() call where 
> listener is not specified
> ---
>
> Key: KAFKA-5846
> URL: https://issues.apache.org/jira/browse/KAFKA-5846
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Reporter: Ted Yu
>Assignee: Kamal Chandraprakash
>Priority: Minor
>
> Currently KafkaConsumer creates instance of NoOpConsumerRebalanceListener for 
> each subscribe() call where ConsumerRebalanceListener is not specified:
> {code}
> public void subscribe(Pattern pattern) {
> subscribe(pattern, new NoOpConsumerRebalanceListener());
> {code}
> We can create a singleton NoOpConsumerRebalanceListener to be used in such 
> scenarios.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5943) Reduce dependency on mock in connector tests

2018-07-10 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5943:
--
Description: 
Currently connector tests make heavy use of mock (easymock, power mock).

This may hide the real logic behind operations and makes finding bugs difficult.


We should reduce the use of mocks so that developers can debug connector code 
using unit tests.
This would shorten the development cycle for connector.

  was:
Currently connector tests make heavy use of mock (easymock, power mock).

This may hide the real logic behind operations and makes finding bugs difficult.

We should reduce the use of mocks so that developers can debug connector code 
using unit tests.
This would shorten the development cycle for connector.


> Reduce dependency on mock in connector tests
> 
>
> Key: KAFKA-5943
> URL: https://issues.apache.org/jira/browse/KAFKA-5943
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>  Labels: connector, mock
>
> Currently connector tests make heavy use of mock (easymock, power mock).
> This may hide the real logic behind operations and makes finding bugs 
> difficult.
> We should reduce the use of mocks so that developers can debug connector code 
> using unit tests.
> This would shorten the development cycle for connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-10 Thread Bohdana Panchenko (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538400#comment-16538400
 ] 

Bohdana Panchenko commented on KAFKA-7141:
--

Steps to reproduce the issue:
I am using Windows and Docker Client for Windows. Kafka broker is running as a 
part of the Landoop distribution [https://github.com/Landoop/fast-data-dev.] 
Setup looks like this
h4. Kafka 1.0.1-L0 @ Landoop's Apache Kafka Distribution

1× Broker, 1× Schema Registry, 1× Connect Distributed Worker, 1× REST Proxy, 1× 
Zookeeper

 My Alpakka Kafka connector client and my scala producer are running in other 
docker containers - all are in the custom bridge network on my local dev 
machine.

I can see my topic in the landoop topic ui and all the messages submitted by my 
scala producer. My alpakka consumer processes them happily and delegates 
offsets committing to the alpakka library (  {color:#d04437}_enable.auto.commit 
= false and 
https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#offset-storage-in-kafka-committing_){color}.
 
1) start fast-data-dev broker, any producer, alpakka consumer - all in separate 
docker containers. Start console consumer from the kafka command line.

2) Send message, verify it has been received from both consumers.

3) Describe both consumers - and compare the output of the describe command for 
the console consumer with the output for the alpakka consumer.

What do you mean by " after manually committing offsets"?

I am using   _enable.auto.commit = false -_ is that not enough?

 

> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#33}_akka.kafka.consumer{_{color}
> {color:#33}  _kafka-clients{_{color}
> {color:#33}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#33}   _enable.auto.commit = false_{color}
> }
> {color:#33} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID{color}_
>  _{color:#205081}STREAM-TEST 0 0 0 0 
> consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_
> {color:#33}But the command to describe my akka stream consumer gives me 
> empty output:{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> myakkastreamkafka-1*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID_{color}
>  
> {color:#33}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-10 Thread Bohdana Panchenko (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538255#comment-16538255
 ] 

Bohdana Panchenko commented on KAFKA-7141:
--

[~vahid], let me check and i will be back

> kafka-consumer-group doesn't describe existing group
> 
>
> Key: KAFKA-7141
> URL: https://issues.apache.org/jira/browse/KAFKA-7141
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0, 1.0.1
>Reporter: Bohdana Panchenko
>Priority: Major
>
> I am running two consumers: akka-stream-kafka consumer with standard config 
> section as described in the 
> [https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html] and  
> kafka-console-consumer.
> akka-stream-kafka consumer configuration looks like this
> {color:#33}_akka.kafka.consumer{_{color}
> {color:#33}  _kafka-clients{_{color}
> {color:#33}    _group.id = "myakkastreamkafka-1"_{color}
> {color:#33}   _enable.auto.commit = false_{color}
> }
> {color:#33} }{color}
>  
>  I am able to see the both groups with the command
>  
>  *kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list*
>  _Note: This will not show information about old Zookeeper-based consumers._
>  
>  _myakkastreamkafka-1_
>  _console-consumer-57171_
> {color:#33}I am able to view details about the console consumer 
> group{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> console-consumer-57171*
>  _{color:#205081}Note: This will not show information about old 
> Zookeeper-based consumers.{color}_
> _{color:#205081}TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID{color}_
>  _{color:#205081}STREAM-TEST 0 0 0 0 
> consumer-1-6b928e07-196a-4322-9928-068681617878 /172.19.0.4 consumer-1{color}_
> {color:#33}But the command to describe my akka stream consumer gives me 
> empty output:{color}
> *kafka-consumer-groups --describe --bootstrap-server 127.0.0.1:9092 --group 
> myakkastreamkafka-1*
>  {color:#205081}_Note: This will not show information about old 
> Zookeeper-based consumers._{color}
> {color:#205081}_TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID 
> HOST CLIENT-ID_{color}
>  
> {color:#33}That is strange. Can you please check the issue?{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)