[jira] [Commented] (KAFKA-6978) Make Streams Window retention time strict

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520915#comment-16520915
 ] 

ASF GitHub Bot commented on KAFKA-6978:
---

guozhangwang closed pull request #5218: KAFKA-6978: make window retention time 
strict
URL: https://github.com/apache/kafka/pull/5218
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index a7747629948..4be1880083f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
 import org.slf4j.Logger;
@@ -32,7 +33,7 @@
 import java.util.Map;
 
 public class KStreamWindowAggregate implements 
KStreamAggProcessorSupplier, V, T> {
-private static final Logger LOG = 
LoggerFactory.getLogger(KStreamWindowAggregate.class);
+private final Logger log = LoggerFactory.getLogger(getClass());
 
 private final String storeName;
 private final Windows windows;
@@ -66,11 +67,14 @@ public void enableSendingOldValues() {
 private WindowStore windowStore;
 private TupleForwarder, T> tupleForwarder;
 private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
 
 @SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
+this.internalProcessorContext = (InternalProcessorContext) context;
+
 metrics = (StreamsMetricsImpl) context.metrics();
 
 windowStore = (WindowStore) context.getStateStore(storeName);
@@ -82,7 +86,7 @@ public void process(final K key, final V value) {
 // if the key is null, we do not need proceed aggregating the 
record
 // the record with the table
 if (key == null) {
-LOG.warn(
+log.warn(
 "Skipping record due to null key. value=[{}] topic=[{}] 
partition=[{}] offset=[{}]",
 value, context().topic(), context().partition(), 
context().offset()
 );
@@ -92,21 +96,32 @@ public void process(final K key, final V value) {
 
 // first get the matching windows
 final long timestamp = context().timestamp();
+final long expiryTime = internalProcessorContext.streamTime() - 
windows.maintainMs();
+
 final Map matchedWindows = windows.windowsFor(timestamp);
 
 // try update the window, and create the new window for the rest 
of unmatched window that do not exist yet
 for (final Map.Entry entry : matchedWindows.entrySet()) {
-T oldAgg = windowStore.fetch(key, entry.getKey());
-
-if (oldAgg == null) {
-oldAgg = initializer.apply();
+final Long windowStart = entry.getKey();
+if (windowStart > expiryTime) {
+T oldAgg = windowStore.fetch(key, windowStart);
+
+if (oldAgg == null) {
+oldAgg = initializer.apply();
+}
+
+final T newAgg = aggregator.apply(key, value, oldAgg);
+
+// update the store with the new value
+windowStore.put(key, newAgg, windowStart);
+tupleForwarder.maybeForward(new Windowed<>(key, 
entry.getValue()), newAgg, oldAgg);
+} else {
+log.warn(
+"Skipping record for expired window. key=[{}] 
topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{}] 
expiration=[{}]",
+key, context().topic(), context().partition(), 
context().offset(), context().timestamp(), windowStart, expiryTime
+);
+metrics.skippedRecordsSensor().record();
 }
-
-final T newAgg = aggregator.apply(key, value, oldAgg);
-
-// update the store with the new value
-windowStore.put(key, newAgg, 

[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-22 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520899#comment-16520899
 ] 

Lucas Wang commented on KAFKA-7040:
---

Hi [~apovzner]
Sorry for the delayed response.

Regarding Case 1 above, I did some testing with min.insync.replicas == 2 and 
unclean leader election DISABLED, to check if a real message loss can happen
https://github.com/apache/kafka/compare/trunk...gitlw:test_incorrect_truncation_causing_message_loss
, and it turns out the answer is yes.
On the high level if AFTER the truncation in step 5, broker0 again becomes the 
leader, and broker1 starts fetching from broker0 who has fewer messages, then 
the message will forever be lost.

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2018-06-22 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520844#comment-16520844
 ] 

Ted Yu commented on KAFKA-7088:
---

Lukasz:
I was looking at code related to how queued TxnRequestHandler's are processed.

Is it possible for you to turn on DEBUG logging and reproduce this scenario ?
Some logs are at TRACE level:
{code}
if (maybeTerminateRequestWithError(nextRequestHandler)) {
log.trace("Not sending transactional request {} because we are in 
an error state",
{code}
But I guess the log file would be chatty once TRACE is turned on.

> Kafka streams thread waits infinitely on transaction init
> -
>
> Key: KAFKA-7088
> URL: https://issues.apache.org/jira/browse/KAFKA-7088
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
> Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
> 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 1
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>Reporter: Lukasz Gluchowski
>Priority: Major
>  Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The 
> topic has 24 partitions and I noticed that processing stopped only for some 
> partitions. I will describe what happened to partition:10. The application is 
> still running (now for about 8 hours) and that thread is hanging there and no 
> rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which 
> was not called). I noticed that after couple of minutes stream stopped 
> processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and 
> the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
>  #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition 
> [0x7fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0xfec6a2f8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> at 
> 

[jira] [Commented] (KAFKA-7091) AdminClient should handle FindCoordinatorResponse errors

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520684#comment-16520684
 ] 

ASF GitHub Bot commented on KAFKA-7091:
---

omkreddy opened a new pull request #5278: KAFKA-7091: AdminClient should handle 
FindCoordinatorResponse errors
URL: https://github.com/apache/kafka/pull/5278
 
 
   - Remove scala AdminClient usage from core and streams tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminClient should handle FindCoordinatorResponse errors
> 
>
> Key: KAFKA-7091
> URL: https://issues.apache.org/jira/browse/KAFKA-7091
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.0
>
>
> Currently KafkaAdminClient.deleteConsumerGroups, listConsumerGroupOffsets 
> methods are ignoring FindCoordinatorResponse errors. We should handle these 
> errors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7091) AdminClient should handle FindCoordinatorResponse errors

2018-06-22 Thread Manikumar (JIRA)
Manikumar created KAFKA-7091:


 Summary: AdminClient should handle FindCoordinatorResponse errors
 Key: KAFKA-7091
 URL: https://issues.apache.org/jira/browse/KAFKA-7091
 Project: Kafka
  Issue Type: Improvement
Reporter: Manikumar
Assignee: Manikumar
 Fix For: 2.0.0


Currently KafkaAdminClient.deleteConsumerGroups, listConsumerGroupOffsets 
methods are ignoring FindCoordinatorResponse errors. We should handle these 
errors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6971) Passing in help flag to kafka-console-producer should print arg options

2018-06-22 Thread raghavan chockalingam (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520628#comment-16520628
 ] 

raghavan chockalingam commented on KAFKA-6971:
--

With Kafka version {{2.11-1.1.0}}, both commands (kafka-console-consumer.sh, 
kafka-console-producer.sh) in {{tools}} directory prints all the options when 
no arguments are passed. {{kafka-console-producer.sh}} alone does not print all 
options with switch {{--help}} unless I have missed something completely.

 

 

> Passing in help flag to kafka-console-producer should print arg options
> ---
>
> Key: KAFKA-6971
> URL: https://issues.apache.org/jira/browse/KAFKA-6971
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, producer 
>Affects Versions: 1.1.0
>Reporter: Yeva Byzek
>Priority: Minor
>  Labels: newbie
>
> {{kafka-console-consumer --help}} prints "help is not a recognized option" as 
> well as output of options
> {{kafka-console-producer --help}} prints "help is not a recognized option" 
> but no output of options
> Possible solutions:
> (a) Enhance {{kafka-console-producer}} to also print out all options when a 
> user passes in an unrecognized option
> (b) Enhance both {{kafka-console-producer}} and {{kafka-console-consumer}} to 
> legitimately accept the {{--help}} flag



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2018-06-22 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520463#comment-16520463
 ] 

Guozhang Wang commented on KAFKA-7088:
--

If you did not override the MAX_BLOCK_MS_CONFIG value, then this call should be 
blocked for at most 60 seconds. It is not clear why you observed the thread 
hanging on this call forever.

> Kafka streams thread waits infinitely on transaction init
> -
>
> Key: KAFKA-7088
> URL: https://issues.apache.org/jira/browse/KAFKA-7088
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
> Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
> 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 1
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>Reporter: Lukasz Gluchowski
>Priority: Major
>  Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The 
> topic has 24 partitions and I noticed that processing stopped only for some 
> partitions. I will describe what happened to partition:10. The application is 
> still running (now for about 8 hours) and that thread is hanging there and no 
> rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which 
> was not called). I noticed that after couple of minutes stream stopped 
> processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and 
> the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
>  #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition 
> [0x7fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0xfec6a2f8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code}
>  
> I tried restarting 

[jira] [Comment Edited] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2018-06-22 Thread Lukasz Gluchowski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520396#comment-16520396
 ] 

Lukasz Gluchowski edited comment on KAFKA-7088 at 6/22/18 2:55 PM:
---

[~yuzhih...@gmail.com] It was set to EXACTLY_ONCE. I switched to "at least 
once" and the problem went away.


was (Author: lgluchowski):
I switched from "exactly once" to "at least once" and the problem went away.

> Kafka streams thread waits infinitely on transaction init
> -
>
> Key: KAFKA-7088
> URL: https://issues.apache.org/jira/browse/KAFKA-7088
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
> Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
> 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 1
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>Reporter: Lukasz Gluchowski
>Priority: Major
>  Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The 
> topic has 24 partitions and I noticed that processing stopped only for some 
> partitions. I will describe what happened to partition:10. The application is 
> still running (now for about 8 hours) and that thread is hanging there and no 
> rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which 
> was not called). I noticed that after couple of minutes stream stopped 
> processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and 
> the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
>  #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition 
> [0x7fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0xfec6a2f8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at 
> 

[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2018-06-22 Thread Lukasz Gluchowski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520396#comment-16520396
 ] 

Lukasz Gluchowski commented on KAFKA-7088:
--

I switched from "exactly once" to "at least once" and the problem went away.

> Kafka streams thread waits infinitely on transaction init
> -
>
> Key: KAFKA-7088
> URL: https://issues.apache.org/jira/browse/KAFKA-7088
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
> Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
> 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 1
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>Reporter: Lukasz Gluchowski
>Priority: Major
>  Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The 
> topic has 24 partitions and I noticed that processing stopped only for some 
> partitions. I will describe what happened to partition:10. The application is 
> still running (now for about 8 hours) and that thread is hanging there and no 
> rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which 
> was not called). I noticed that after couple of minutes stream stopped 
> processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and 
> the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
>  #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition 
> [0x7fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0xfec6a2f8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code}
>  
> I tried restarting application once but the situation repeated. Thread read 
> some data, committed offset and stopped 

[jira] [Updated] (KAFKA-7079) ValueTransformerWithKeySupplier is not mentioned in the documentation

2018-06-22 Thread Hashan Gayasri Udugahapattuwa (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hashan Gayasri Udugahapattuwa updated KAFKA-7079:
-
Description: 
ValueTransformer#transform does not pass the key

KStream#transformValues(ValueTransformerWithKeySupplier . method is not 
documented. It might lead to people to use workarounds or fall back to using 
Transformer. This is very likely if the user is using a wrapper API (i.e: for 
Scala) as the user would be checking the documentation more than the available 
API functions in code.

 

 

 

 

*Original issue (as it might be useful as a business requirement)*

ValueTransformers' transform method doesn't pass the key to user-code. 
Reporting this as a bug since it currently requires workarounds.

Context:

I'm currently in the process of converting two stateful "*aggregate*" DSL 
operations to the Processor API since the state of those operations are 
relatively large and takes 99% + of CPU time (when profiled) for serializing 
and deserializing them via Kryo. 

Since DSL aggregations use state stores of [Bytes, Array[Byte]]] even when 
using the in-memory state store, it seems like the only way to reduce the 
serialization/deserialization overhead is to convert heavy aggregates to 
*transform*s.

In my case, *ValueTransformer* seems to be the option. However, since 
ValueTransformers' _transform_ method only exposes the _value_, I'd either have 
to pre-process and add the key to the value or use *Transformer* instead (which 
is not my intent).

 

As internal _*InternalValueTransformerWithKey*_ already has the readOnlyKey, it 
seems like a good idea to pass the key to the transform method as well, esp 
since in a stateful transformation, generally the state store has to be queried 
by the key.

  was:
ValueTransformers' transform method doesn't pass the key to user-code. 
Reporting this as a bug since it currently requires workarounds.

 

Context:

I'm currently in the process of converting two stateful "*aggregate*" DSL 
operations to the Processor API since the state of those operations are 
relatively large and takes 99% + of CPU time (when profiled) for serializing 
and deserializing them via Kryo. 

Since DSL aggregations use state stores of [Bytes, Array[Byte]]] even when 
using the in-memory state store, it seems like the only way to reduce the 
serialization/deserialization overhead is to convert heavy aggregates to 
*transform*s.

In my case, *ValueTransformer* seems to be the option. However, since 
ValueTransformers' _transform_ method only exposes the _value_, I'd either have 
to pre-process and add the key to the value or use *Transformer* instead (which 
is not my intent).

 

As internal _*InternalValueTransformerWithKey*_ already has the readOnlyKey, it 
seems like a good idea to pass the key to the transform method as well, esp 
since in a stateful transformation, generally the state store has to be queried 
by the key.

Summary: ValueTransformerWithKeySupplier is not mentioned in the 
documentation  (was: ValueTransformer#transform does not pass the key)

> ValueTransformerWithKeySupplier is not mentioned in the documentation
> -
>
> Key: KAFKA-7079
> URL: https://issues.apache.org/jira/browse/KAFKA-7079
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: Fedora 27
>Reporter: Hashan Gayasri Udugahapattuwa
>Priority: Major
>
> ValueTransformer#transform does not pass the key
> KStream#transformValues(ValueTransformerWithKeySupplier . method is not 
> documented. It might lead to people to use workarounds or fall back to using 
> Transformer. This is very likely if the user is using a wrapper API (i.e: for 
> Scala) as the user would be checking the documentation more than the 
> available API functions in code.
>  
>  
>  
>  
> 
> *Original issue (as it might be useful as a business requirement)*
> ValueTransformers' transform method doesn't pass the key to user-code. 
> Reporting this as a bug since it currently requires workarounds.
> Context:
> I'm currently in the process of converting two stateful "*aggregate*" DSL 
> operations to the Processor API since the state of those operations are 
> relatively large and takes 99% + of CPU time (when profiled) for serializing 
> and deserializing them via Kryo. 
> Since DSL aggregations use state stores of [Bytes, Array[Byte]]] even when 
> using the in-memory state store, it seems like the only way to reduce the 
> serialization/deserialization overhead is to convert heavy aggregates to 
> *transform*s.
> In my case, *ValueTransformer* seems to be the option. However, since 
> ValueTransformers' _transform_ method only exposes the _value_, I'd either 
> have to 

[jira] [Created] (KAFKA-7090) Zookeeper client setting in server-properties

2018-06-22 Thread Christian Tramnitz (JIRA)
Christian Tramnitz created KAFKA-7090:
-

 Summary: Zookeeper client setting in server-properties
 Key: KAFKA-7090
 URL: https://issues.apache.org/jira/browse/KAFKA-7090
 Project: Kafka
  Issue Type: New Feature
  Components: config, documentation
Reporter: Christian Tramnitz


There are several Zookeeper client settings that may be used to connect to ZK.

Currently, it seems only very few zookeeper.* settings are supported in Kafka's 
server.properties file. Wouldn't it make sense to support all zookeeper client 
settings there or where would that need to go?

I.e. for using Zookeeper 3.5 with TLS enabled, the following properties are 
required:

zookeeper.clientCnxnSocket
zookeeper.client.secure
zookeeper.ssl.keyStore.location
zookeeper.ssl.keyStore.password
zookeeper.ssl.trustStore.location
zookeeper.ssl.trustStore.password

It's obviously possible to pass them through "-D", but especially for the 
keystore password, I'd be more comfortable with this sitting in the properties 
file than being visible in the process list...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520172#comment-16520172
 ] 

ASF GitHub Bot commented on KAFKA-4217:
---

cadonna opened a new pull request #5273: KAFKA-4217: Add KStream.flatTransform
URL: https://github.com/apache/kafka/pull/5273
 
 
   - Adds flatTrasform method in KStream
   - Adds processor supplier and processor for flatTransform
   
   This contribution is my original work and I license the work to the project 
under the project's open source license.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KStream.transform equivalent of flatMap
> ---
>
> Key: KAFKA-4217
> URL: https://issues.apache.org/jira/browse/KAFKA-4217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: api, needs-kip, newbie
>
> {{KStream.transform}} gives you access to state stores while allowing you to 
> return zero or one transformed {{KeyValue}}.  Alas, it is unclear what method 
> you should use if you want to access state stores and return zero or multiple 
> {{KeyValue}}.  Presumably you can use {{transform}}, always return {{null}}, 
> and use {{ProcessorContext.forward}} to emit {{KeyValues}}.
> It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or 
> allow store access from other {{KStream}} methods, such as {{flatMap}} itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2018-06-22 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520094#comment-16520094
 ] 

Ted Yu commented on KAFKA-7088:
---

What's the value for "processing.guarantee" ?
{code}
this.eosEnabled = 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
{code}
The call to KafkaProducer.initTransactions is governed by eosEnabled being true.

> Kafka streams thread waits infinitely on transaction init
> -
>
> Key: KAFKA-7088
> URL: https://issues.apache.org/jira/browse/KAFKA-7088
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
> Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
> 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 1
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>Reporter: Lukasz Gluchowski
>Priority: Major
>  Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The 
> topic has 24 partitions and I noticed that processing stopped only for some 
> partitions. I will describe what happened to partition:10. The application is 
> still running (now for about 8 hours) and that thread is hanging there and no 
> rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which 
> was not called). I noticed that after couple of minutes stream stopped 
> processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and 
> the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
>  #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition 
> [0x7fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0xfec6a2f8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at 
> 

[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2018-06-22 Thread Lukasz Gluchowski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520069#comment-16520069
 ] 

Lukasz Gluchowski commented on KAFKA-7088:
--

[~guozhang] thank you for your response. The producer is created automatically 
by kafka streams (that's why I categorized it as stream issue).

I didn't override {{ProducerConfig.MAX_BLOCK_MS_CONFIG}}, property so we rely 
on default value (FYI I put all config properties that we explicitly set in the 
"Environment" section of this ticket).

Broker itself is healthy and we didn't experienced any network issues when this 
error happened, we are running multiple kafka streams applications and vast 
majority of the clients have zero lag. If this helps logs from the broker said 
that the consumer group is empty.

Note that this situation is repeatable. I restarted the application and the 
same situation occurred after ~15 minutes.
{code:java}
kgroups --describe --group mp_ads_publisher_pro_madstorage-web-corotos-prod
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'mp_ads_publisher_pro_madstorage-web-corotos-prod' has no active 
members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
madstorage-web-corotos-prod 10 32606948 33472402 865454 - - -
. (other 23 partitions){code}
 

Let me know if you need more info,

Thanks

> Kafka streams thread waits infinitely on transaction init
> -
>
> Key: KAFKA-7088
> URL: https://issues.apache.org/jira/browse/KAFKA-7088
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
> Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 
> 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 1
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>Reporter: Lukasz Gluchowski
>Priority: Major
>  Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The 
> topic has 24 partitions and I noticed that processing stopped only for some 
> partitions. I will describe what happened to partition:10. The application is 
> still running (now for about 8 hours) and that thread is hanging there and no 
> rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which 
> was not called). I noticed that after couple of minutes stream stopped 
> processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and 
> the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33"
>  #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition 
> [0x7fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0xfec6a2f8> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at 
> org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at 
>