[jira] [Commented] (KAFKA-6978) Make Streams Window retention time strict
[ https://issues.apache.org/jira/browse/KAFKA-6978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520915#comment-16520915 ] ASF GitHub Bot commented on KAFKA-6978: --- guozhangwang closed pull request #5218: KAFKA-6978: make window retention time strict URL: https://github.com/apache/kafka/pull/5218 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index a7747629948..4be1880083f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.WindowStore; import org.slf4j.Logger; @@ -32,7 +33,7 @@ import java.util.Map; public class KStreamWindowAggregate implements KStreamAggProcessorSupplier, V, T> { -private static final Logger LOG = LoggerFactory.getLogger(KStreamWindowAggregate.class); +private final Logger log = LoggerFactory.getLogger(getClass()); private final String storeName; private final Windows windows; @@ -66,11 +67,14 @@ public void enableSendingOldValues() { private WindowStore windowStore; private TupleForwarder, T> tupleForwarder; private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); +this.internalProcessorContext = (InternalProcessorContext) context; + metrics = (StreamsMetricsImpl) context.metrics(); windowStore = (WindowStore) context.getStateStore(storeName); @@ -82,7 +86,7 @@ public void process(final K key, final V value) { // if the key is null, we do not need proceed aggregating the record // the record with the table if (key == null) { -LOG.warn( +log.warn( "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", value, context().topic(), context().partition(), context().offset() ); @@ -92,21 +96,32 @@ public void process(final K key, final V value) { // first get the matching windows final long timestamp = context().timestamp(); +final long expiryTime = internalProcessorContext.streamTime() - windows.maintainMs(); + final Map matchedWindows = windows.windowsFor(timestamp); // try update the window, and create the new window for the rest of unmatched window that do not exist yet for (final Map.Entry entry : matchedWindows.entrySet()) { -T oldAgg = windowStore.fetch(key, entry.getKey()); - -if (oldAgg == null) { -oldAgg = initializer.apply(); +final Long windowStart = entry.getKey(); +if (windowStart > expiryTime) { +T oldAgg = windowStore.fetch(key, windowStart); + +if (oldAgg == null) { +oldAgg = initializer.apply(); +} + +final T newAgg = aggregator.apply(key, value, oldAgg); + +// update the store with the new value +windowStore.put(key, newAgg, windowStart); +tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg); +} else { +log.warn( +"Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{}] expiration=[{}]", +key, context().topic(), context().partition(), context().offset(), context().timestamp(), windowStart, expiryTime +); +metrics.skippedRecordsSensor().record(); } - -final T newAgg = aggregator.apply(key, value, oldAgg); - -// update the store with the new value -windowStore.put(key, newAgg,
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520899#comment-16520899 ] Lucas Wang commented on KAFKA-7040: --- Hi [~apovzner] Sorry for the delayed response. Regarding Case 1 above, I did some testing with min.insync.replicas == 2 and unclean leader election DISABLED, to check if a real message loss can happen https://github.com/apache/kafka/compare/trunk...gitlw:test_incorrect_truncation_causing_message_loss , and it turns out the answer is yes. On the high level if AFTER the truncation in step 5, broker0 again becomes the leader, and broker1 starts fetching from broker0 who has fewer messages, then the message will forever be lost. > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init
[ https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520844#comment-16520844 ] Ted Yu commented on KAFKA-7088: --- Lukasz: I was looking at code related to how queued TxnRequestHandler's are processed. Is it possible for you to turn on DEBUG logging and reproduce this scenario ? Some logs are at TRACE level: {code} if (maybeTerminateRequestWithError(nextRequestHandler)) { log.trace("Not sending transactional request {} because we are in an error state", {code} But I guess the log file would be chatty once TRACE is turned on. > Kafka streams thread waits infinitely on transaction init > - > > Key: KAFKA-7088 > URL: https://issues.apache.org/jira/browse/KAFKA-7088 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 > Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 > 20:07:43 UTC 2018 > kafka-streams (client) 1.0.1 > kafka broker 1.1.0 > Java version: > OpenJDK Runtime Environment (build 1.8.0_171-b10) > OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode) > kakfa config overrides: > num.stream.threads: 6 > session.timeout.ms: 1 > request.timeout.ms: 11000 > fetch.max.wait.ms: 500 > max.poll.records: 1000 > topic has 24 partitions >Reporter: Lukasz Gluchowski >Priority: Major > Labels: eos > > A kafka stream application thread stops processing without any feedback. The > topic has 24 partitions and I noticed that processing stopped only for some > partitions. I will describe what happened to partition:10. The application is > still running (now for about 8 hours) and that thread is hanging there and no > rebalancing that took place. > There is no error (we have a custom `Thread.UncaughtExceptionHandler` which > was not called). I noticed that after couple of minutes stream stopped > processing (at offset 32606948 where log-end-offset is 33472402). > Broker itself is not reporting any active consumer in that consumer group and > the only info I was able to gather was from thread dump: > {code:java} > "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33" > #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition > [0x7fe0215d4000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfec6a2f8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) > at > org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > at >
[jira] [Commented] (KAFKA-7091) AdminClient should handle FindCoordinatorResponse errors
[ https://issues.apache.org/jira/browse/KAFKA-7091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520684#comment-16520684 ] ASF GitHub Bot commented on KAFKA-7091: --- omkreddy opened a new pull request #5278: KAFKA-7091: AdminClient should handle FindCoordinatorResponse errors URL: https://github.com/apache/kafka/pull/5278 - Remove scala AdminClient usage from core and streams tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > AdminClient should handle FindCoordinatorResponse errors > > > Key: KAFKA-7091 > URL: https://issues.apache.org/jira/browse/KAFKA-7091 > Project: Kafka > Issue Type: Improvement >Reporter: Manikumar >Assignee: Manikumar >Priority: Major > Fix For: 2.0.0 > > > Currently KafkaAdminClient.deleteConsumerGroups, listConsumerGroupOffsets > methods are ignoring FindCoordinatorResponse errors. We should handle these > errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7091) AdminClient should handle FindCoordinatorResponse errors
Manikumar created KAFKA-7091: Summary: AdminClient should handle FindCoordinatorResponse errors Key: KAFKA-7091 URL: https://issues.apache.org/jira/browse/KAFKA-7091 Project: Kafka Issue Type: Improvement Reporter: Manikumar Assignee: Manikumar Fix For: 2.0.0 Currently KafkaAdminClient.deleteConsumerGroups, listConsumerGroupOffsets methods are ignoring FindCoordinatorResponse errors. We should handle these errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6971) Passing in help flag to kafka-console-producer should print arg options
[ https://issues.apache.org/jira/browse/KAFKA-6971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520628#comment-16520628 ] raghavan chockalingam commented on KAFKA-6971: -- With Kafka version {{2.11-1.1.0}}, both commands (kafka-console-consumer.sh, kafka-console-producer.sh) in {{tools}} directory prints all the options when no arguments are passed. {{kafka-console-producer.sh}} alone does not print all options with switch {{--help}} unless I have missed something completely. > Passing in help flag to kafka-console-producer should print arg options > --- > > Key: KAFKA-6971 > URL: https://issues.apache.org/jira/browse/KAFKA-6971 > Project: Kafka > Issue Type: Improvement > Components: core, producer >Affects Versions: 1.1.0 >Reporter: Yeva Byzek >Priority: Minor > Labels: newbie > > {{kafka-console-consumer --help}} prints "help is not a recognized option" as > well as output of options > {{kafka-console-producer --help}} prints "help is not a recognized option" > but no output of options > Possible solutions: > (a) Enhance {{kafka-console-producer}} to also print out all options when a > user passes in an unrecognized option > (b) Enhance both {{kafka-console-producer}} and {{kafka-console-consumer}} to > legitimately accept the {{--help}} flag -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init
[ https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520463#comment-16520463 ] Guozhang Wang commented on KAFKA-7088: -- If you did not override the MAX_BLOCK_MS_CONFIG value, then this call should be blocked for at most 60 seconds. It is not clear why you observed the thread hanging on this call forever. > Kafka streams thread waits infinitely on transaction init > - > > Key: KAFKA-7088 > URL: https://issues.apache.org/jira/browse/KAFKA-7088 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 > Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 > 20:07:43 UTC 2018 > kafka-streams (client) 1.0.1 > kafka broker 1.1.0 > Java version: > OpenJDK Runtime Environment (build 1.8.0_171-b10) > OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode) > kakfa config overrides: > num.stream.threads: 6 > session.timeout.ms: 1 > request.timeout.ms: 11000 > fetch.max.wait.ms: 500 > max.poll.records: 1000 > topic has 24 partitions >Reporter: Lukasz Gluchowski >Priority: Major > Labels: eos > > A kafka stream application thread stops processing without any feedback. The > topic has 24 partitions and I noticed that processing stopped only for some > partitions. I will describe what happened to partition:10. The application is > still running (now for about 8 hours) and that thread is hanging there and no > rebalancing that took place. > There is no error (we have a custom `Thread.UncaughtExceptionHandler` which > was not called). I noticed that after couple of minutes stream stopped > processing (at offset 32606948 where log-end-offset is 33472402). > Broker itself is not reporting any active consumer in that consumer group and > the only info I was able to gather was from thread dump: > {code:java} > "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33" > #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition > [0x7fe0215d4000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfec6a2f8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) > at > org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code} > > I tried restarting
[jira] [Comment Edited] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init
[ https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520396#comment-16520396 ] Lukasz Gluchowski edited comment on KAFKA-7088 at 6/22/18 2:55 PM: --- [~yuzhih...@gmail.com] It was set to EXACTLY_ONCE. I switched to "at least once" and the problem went away. was (Author: lgluchowski): I switched from "exactly once" to "at least once" and the problem went away. > Kafka streams thread waits infinitely on transaction init > - > > Key: KAFKA-7088 > URL: https://issues.apache.org/jira/browse/KAFKA-7088 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 > Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 > 20:07:43 UTC 2018 > kafka-streams (client) 1.0.1 > kafka broker 1.1.0 > Java version: > OpenJDK Runtime Environment (build 1.8.0_171-b10) > OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode) > kakfa config overrides: > num.stream.threads: 6 > session.timeout.ms: 1 > request.timeout.ms: 11000 > fetch.max.wait.ms: 500 > max.poll.records: 1000 > topic has 24 partitions >Reporter: Lukasz Gluchowski >Priority: Major > Labels: eos > > A kafka stream application thread stops processing without any feedback. The > topic has 24 partitions and I noticed that processing stopped only for some > partitions. I will describe what happened to partition:10. The application is > still running (now for about 8 hours) and that thread is hanging there and no > rebalancing that took place. > There is no error (we have a custom `Thread.UncaughtExceptionHandler` which > was not called). I noticed that after couple of minutes stream stopped > processing (at offset 32606948 where log-end-offset is 33472402). > Broker itself is not reporting any active consumer in that consumer group and > the only info I was able to gather was from thread dump: > {code:java} > "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33" > #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition > [0x7fe0215d4000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfec6a2f8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) > at > org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at >
[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init
[ https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520396#comment-16520396 ] Lukasz Gluchowski commented on KAFKA-7088: -- I switched from "exactly once" to "at least once" and the problem went away. > Kafka streams thread waits infinitely on transaction init > - > > Key: KAFKA-7088 > URL: https://issues.apache.org/jira/browse/KAFKA-7088 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 > Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 > 20:07:43 UTC 2018 > kafka-streams (client) 1.0.1 > kafka broker 1.1.0 > Java version: > OpenJDK Runtime Environment (build 1.8.0_171-b10) > OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode) > kakfa config overrides: > num.stream.threads: 6 > session.timeout.ms: 1 > request.timeout.ms: 11000 > fetch.max.wait.ms: 500 > max.poll.records: 1000 > topic has 24 partitions >Reporter: Lukasz Gluchowski >Priority: Major > Labels: eos > > A kafka stream application thread stops processing without any feedback. The > topic has 24 partitions and I noticed that processing stopped only for some > partitions. I will describe what happened to partition:10. The application is > still running (now for about 8 hours) and that thread is hanging there and no > rebalancing that took place. > There is no error (we have a custom `Thread.UncaughtExceptionHandler` which > was not called). I noticed that after couple of minutes stream stopped > processing (at offset 32606948 where log-end-offset is 33472402). > Broker itself is not reporting any active consumer in that consumer group and > the only info I was able to gather was from thread dump: > {code:java} > "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33" > #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition > [0x7fe0215d4000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfec6a2f8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) > at > org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code} > > I tried restarting application once but the situation repeated. Thread read > some data, committed offset and stopped
[jira] [Updated] (KAFKA-7079) ValueTransformerWithKeySupplier is not mentioned in the documentation
[ https://issues.apache.org/jira/browse/KAFKA-7079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hashan Gayasri Udugahapattuwa updated KAFKA-7079: - Description: ValueTransformer#transform does not pass the key KStream#transformValues(ValueTransformerWithKeySupplier . method is not documented. It might lead to people to use workarounds or fall back to using Transformer. This is very likely if the user is using a wrapper API (i.e: for Scala) as the user would be checking the documentation more than the available API functions in code. *Original issue (as it might be useful as a business requirement)* ValueTransformers' transform method doesn't pass the key to user-code. Reporting this as a bug since it currently requires workarounds. Context: I'm currently in the process of converting two stateful "*aggregate*" DSL operations to the Processor API since the state of those operations are relatively large and takes 99% + of CPU time (when profiled) for serializing and deserializing them via Kryo. Since DSL aggregations use state stores of [Bytes, Array[Byte]]] even when using the in-memory state store, it seems like the only way to reduce the serialization/deserialization overhead is to convert heavy aggregates to *transform*s. In my case, *ValueTransformer* seems to be the option. However, since ValueTransformers' _transform_ method only exposes the _value_, I'd either have to pre-process and add the key to the value or use *Transformer* instead (which is not my intent). As internal _*InternalValueTransformerWithKey*_ already has the readOnlyKey, it seems like a good idea to pass the key to the transform method as well, esp since in a stateful transformation, generally the state store has to be queried by the key. was: ValueTransformers' transform method doesn't pass the key to user-code. Reporting this as a bug since it currently requires workarounds. Context: I'm currently in the process of converting two stateful "*aggregate*" DSL operations to the Processor API since the state of those operations are relatively large and takes 99% + of CPU time (when profiled) for serializing and deserializing them via Kryo. Since DSL aggregations use state stores of [Bytes, Array[Byte]]] even when using the in-memory state store, it seems like the only way to reduce the serialization/deserialization overhead is to convert heavy aggregates to *transform*s. In my case, *ValueTransformer* seems to be the option. However, since ValueTransformers' _transform_ method only exposes the _value_, I'd either have to pre-process and add the key to the value or use *Transformer* instead (which is not my intent). As internal _*InternalValueTransformerWithKey*_ already has the readOnlyKey, it seems like a good idea to pass the key to the transform method as well, esp since in a stateful transformation, generally the state store has to be queried by the key. Summary: ValueTransformerWithKeySupplier is not mentioned in the documentation (was: ValueTransformer#transform does not pass the key) > ValueTransformerWithKeySupplier is not mentioned in the documentation > - > > Key: KAFKA-7079 > URL: https://issues.apache.org/jira/browse/KAFKA-7079 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 > Environment: Fedora 27 >Reporter: Hashan Gayasri Udugahapattuwa >Priority: Major > > ValueTransformer#transform does not pass the key > KStream#transformValues(ValueTransformerWithKeySupplier . method is not > documented. It might lead to people to use workarounds or fall back to using > Transformer. This is very likely if the user is using a wrapper API (i.e: for > Scala) as the user would be checking the documentation more than the > available API functions in code. > > > > > > *Original issue (as it might be useful as a business requirement)* > ValueTransformers' transform method doesn't pass the key to user-code. > Reporting this as a bug since it currently requires workarounds. > Context: > I'm currently in the process of converting two stateful "*aggregate*" DSL > operations to the Processor API since the state of those operations are > relatively large and takes 99% + of CPU time (when profiled) for serializing > and deserializing them via Kryo. > Since DSL aggregations use state stores of [Bytes, Array[Byte]]] even when > using the in-memory state store, it seems like the only way to reduce the > serialization/deserialization overhead is to convert heavy aggregates to > *transform*s. > In my case, *ValueTransformer* seems to be the option. However, since > ValueTransformers' _transform_ method only exposes the _value_, I'd either > have to
[jira] [Created] (KAFKA-7090) Zookeeper client setting in server-properties
Christian Tramnitz created KAFKA-7090: - Summary: Zookeeper client setting in server-properties Key: KAFKA-7090 URL: https://issues.apache.org/jira/browse/KAFKA-7090 Project: Kafka Issue Type: New Feature Components: config, documentation Reporter: Christian Tramnitz There are several Zookeeper client settings that may be used to connect to ZK. Currently, it seems only very few zookeeper.* settings are supported in Kafka's server.properties file. Wouldn't it make sense to support all zookeeper client settings there or where would that need to go? I.e. for using Zookeeper 3.5 with TLS enabled, the following properties are required: zookeeper.clientCnxnSocket zookeeper.client.secure zookeeper.ssl.keyStore.location zookeeper.ssl.keyStore.password zookeeper.ssl.trustStore.location zookeeper.ssl.trustStore.password It's obviously possible to pass them through "-D", but especially for the keystore password, I'd be more comfortable with this sitting in the properties file than being visible in the process list... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap
[ https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520172#comment-16520172 ] ASF GitHub Bot commented on KAFKA-4217: --- cadonna opened a new pull request #5273: KAFKA-4217: Add KStream.flatTransform URL: https://github.com/apache/kafka/pull/5273 - Adds flatTrasform method in KStream - Adds processor supplier and processor for flatTransform This contribution is my original work and I license the work to the project under the project's open source license. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KStream.transform equivalent of flatMap > --- > > Key: KAFKA-4217 > URL: https://issues.apache.org/jira/browse/KAFKA-4217 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Bruno Cadonna >Priority: Major > Labels: api, needs-kip, newbie > > {{KStream.transform}} gives you access to state stores while allowing you to > return zero or one transformed {{KeyValue}}. Alas, it is unclear what method > you should use if you want to access state stores and return zero or multiple > {{KeyValue}}. Presumably you can use {{transform}}, always return {{null}}, > and use {{ProcessorContext.forward}} to emit {{KeyValues}}. > It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or > allow store access from other {{KStream}} methods, such as {{flatMap}} itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init
[ https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520094#comment-16520094 ] Ted Yu commented on KAFKA-7088: --- What's the value for "processing.guarantee" ? {code} this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); {code} The call to KafkaProducer.initTransactions is governed by eosEnabled being true. > Kafka streams thread waits infinitely on transaction init > - > > Key: KAFKA-7088 > URL: https://issues.apache.org/jira/browse/KAFKA-7088 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 > Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 > 20:07:43 UTC 2018 > kafka-streams (client) 1.0.1 > kafka broker 1.1.0 > Java version: > OpenJDK Runtime Environment (build 1.8.0_171-b10) > OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode) > kakfa config overrides: > num.stream.threads: 6 > session.timeout.ms: 1 > request.timeout.ms: 11000 > fetch.max.wait.ms: 500 > max.poll.records: 1000 > topic has 24 partitions >Reporter: Lukasz Gluchowski >Priority: Major > Labels: eos > > A kafka stream application thread stops processing without any feedback. The > topic has 24 partitions and I noticed that processing stopped only for some > partitions. I will describe what happened to partition:10. The application is > still running (now for about 8 hours) and that thread is hanging there and no > rebalancing that took place. > There is no error (we have a custom `Thread.UncaughtExceptionHandler` which > was not called). I noticed that after couple of minutes stream stopped > processing (at offset 32606948 where log-end-offset is 33472402). > Broker itself is not reporting any active consumer in that consumer group and > the only info I was able to gather was from thread dump: > {code:java} > "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33" > #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition > [0x7fe0215d4000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfec6a2f8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) > at > org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at >
[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init
[ https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520069#comment-16520069 ] Lukasz Gluchowski commented on KAFKA-7088: -- [~guozhang] thank you for your response. The producer is created automatically by kafka streams (that's why I categorized it as stream issue). I didn't override {{ProducerConfig.MAX_BLOCK_MS_CONFIG}}, property so we rely on default value (FYI I put all config properties that we explicitly set in the "Environment" section of this ticket). Broker itself is healthy and we didn't experienced any network issues when this error happened, we are running multiple kafka streams applications and vast majority of the clients have zero lag. If this helps logs from the broker said that the consumer group is empty. Note that this situation is repeatable. I restarted the application and the same situation occurred after ~15 minutes. {code:java} kgroups --describe --group mp_ads_publisher_pro_madstorage-web-corotos-prod Note: This will not show information about old Zookeeper-based consumers. Consumer group 'mp_ads_publisher_pro_madstorage-web-corotos-prod' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID madstorage-web-corotos-prod 10 32606948 33472402 865454 - - - . (other 23 partitions){code} Let me know if you need more info, Thanks > Kafka streams thread waits infinitely on transaction init > - > > Key: KAFKA-7088 > URL: https://issues.apache.org/jira/browse/KAFKA-7088 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 > Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 > 20:07:43 UTC 2018 > kafka-streams (client) 1.0.1 > kafka broker 1.1.0 > Java version: > OpenJDK Runtime Environment (build 1.8.0_171-b10) > OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode) > kakfa config overrides: > num.stream.threads: 6 > session.timeout.ms: 1 > request.timeout.ms: 11000 > fetch.max.wait.ms: 500 > max.poll.records: 1000 > topic has 24 partitions >Reporter: Lukasz Gluchowski >Priority: Major > Labels: eos > > A kafka stream application thread stops processing without any feedback. The > topic has 24 partitions and I noticed that processing stopped only for some > partitions. I will describe what happened to partition:10. The application is > still running (now for about 8 hours) and that thread is hanging there and no > rebalancing that took place. > There is no error (we have a custom `Thread.UncaughtExceptionHandler` which > was not called). I noticed that after couple of minutes stream stopped > processing (at offset 32606948 where log-end-offset is 33472402). > Broker itself is not reporting any active consumer in that consumer group and > the only info I was able to gather was from thread dump: > {code:java} > "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33" > #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition > [0x7fe0215d4000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfec6a2f8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) > at > org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) > at >