[jira] [Created] (KAFKA-13680) Kafka Streams application remains in RUNNING state although all stream threads shut down
Denis Washington created KAFKA-13680: Summary: Kafka Streams application remains in RUNNING state although all stream threads shut down Key: KAFKA-13680 URL: https://issues.apache.org/jira/browse/KAFKA-13680 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.0.0 Reporter: Denis Washington We have a Kafka Streams application that is configured with {{LogAndFailExceptionHandler}} as deserialization error handler. In the Kafka Streams version we used previously (2.7), a deserialization error that causes all stream threads to shut down would ultimately move the application to the ERROR state. However, after updating to Kafka Streams 3.0.0, we see a different behavior: the stream threads still shut down, but the Kafka Streams application stays in the RUNNING state. It thus gets into a "zombie" state not detected by our monitoring. It may be worth noting that this application has global state stores, and that the global stream thread was not affected by the deserialization error where we noticed the problem. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880227#comment-16880227 ] Denis Washington commented on KAFKA-8630: - Looks like the {{InMemoryWindowStore}} needs the extra methods of the {{InternalProcessorContext}} interface only for recording metrics. Perhaps it could just surround those blocks with {{if (context instanceof InternalProcessorContext)}}, but otherwise also accept other {{ProcessorContext}} implementations like {{MockProcessorContext}}. > Unit testing a streams processor with a WindowStore throws a > ClassCastException > --- > > Key: KAFKA-8630 > URL: https://issues.apache.org/jira/browse/KAFKA-8630 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 2.3.0 >Reporter: Justin Fetherolf >Priority: Major > > I was attempting to write a unit test for a class implementing the > {{Processor}} interface that contained a {{WindowStore}}, but running the > test fails with a {{ClassCastException}} coming out of > {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to > {{InternalProcessorContext}}. > Minimal code to reproduce: > {code:java} > package com.cantgetthistowork; > import org.apache.kafka.streams.processor.Processor; > import org.apache.kafka.streams.processor.ProcessorContext; > import org.apache.kafka.streams.state.WindowStore; > public class InMemWindowProcessor implements Processor { > private ProcessorContext context; > private WindowStore windowStore; > @Override > public void init(ProcessorContext context) { > this.context = context; > windowStore = (WindowStore) > context.getStateStore("my-win-store"); > } > @Override > public void process(String key, String value) { > } > @Override > public void close() { > } > } > {code} > {code:java} > package com.cantgetthistowork; > import java.time.Duration; > import java.time.Instant; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.processor.MockProcessorContext; > import org.apache.kafka.streams.state.Stores; > import org.apache.kafka.streams.state.WindowStore; > import org.junit.Before; > import org.junit.Test; > public class InMemWindowProcessorTest { > InMemWindowProcessor processor = null; > MockProcessorContext context = null; > @Before > public void setup() { > processor = new InMemWindowProcessor(); > context = new MockProcessorContext(); > WindowStore store = > Stores.windowStoreBuilder( > Stores.inMemoryWindowStore( > "my-win-store", > Duration.ofMinutes(10), > Duration.ofSeconds(10), > false > ), > Serdes.String(), > Serdes.String() > ) > .withLoggingDisabled() > .build(); > store.init(context, store); > context.register(store, null); > processor.init(context); > } > @Test > public void testThings() { > Instant baseTime = Instant.now(); > context.setTimestamp(baseTime.toEpochMilli()); > context.setTopic("topic-name"); > processor.process("key1", "value1"); > } > } > {code} > > I was trying this with maven, with mvn --version outputting: > {noformat} > Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; > 2017-04-03T13:39:06-06:00) > Maven home: ~/opt/apache-maven-3.5.0 > Java version: 1.8.0_212, vendor: Oracle Corporation > Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: > "unix"{noformat} > And finally the stack trace: > {noformat} > --- > T E S T S > --- > Running com.cantgetthistowork.InMemWindowProcessorTest > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< > FAILURE! > testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: > 0.05 sec <<< ERROR! > java.lang.ClassCastException: > org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to > org.apache.kafka.streams.processor.internals.InternalProcessorContext > at > org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90) >
[jira] [Updated] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request
[ https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Denis Washington updated KAFKA-8635: Description: In our Kafka Streams applications (with EOS enabled), we were seeing mysterious long delays between records being produced by a stream task and the same records being consumed by the next task. These delays turned out to always be around {{retry.backoff.ms}} long; reducing that value reduced the delays by about the same amount. After digging further, I pinned down the problem to the following lines in {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}: {{} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {}} {{ // as long as there are outstanding transactional requests, we simply wait for them to return {{ client.poll(retryBackoffMs, time.milliseconds());}} {{ return;}} {{}}} This code seems to assume that, if {{maybeSendTransactionalRequest}} returns true, a transactional request has been sent out that should be waited for. However, this is not true if the request requires a coordinator lookup: {{if (nextRequestHandler.needsCoordinator()) {}} {{ targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());}} {{ if (targetNode == null) {}} {{ transactionManager.lookupCoordinator(nextRequestHandler); }} {{ break;}} {{ }}} {{ ...}} {{lookupCoordinator()}} does not actually send anything, but just enqueues a coordinator lookup request for the {{Sender}}'s next run loop iteration. {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} jumps to a {{return true}} at the end of the method), leading the {{Sender}} to needlessly wait via {{client.poll()}} although there is actually no request in-flight. I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if it merely enqueues the coordinator lookup instead of actually sending anything. But I'm not sure, hence the bug report instead of a pull request. was: In our Kafka Streams applications (with EOS enabled), we were seeing mysterious long delays between records being produced by a stream task and the same records being consumed by the next task. These delays turned out to always be around {{retry.backoff.ms}} long; reducing that value reduced the delays by about the same amount. After digging further, I pinned down the problem to the following lines in {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}: {{} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {}} {{ // as long as there are outstanding transactional requests, we simply wait for them to return {{ client.poll(retryBackoffMs, time.milliseconds());}} {{ return;}} {{}}} This code seems to assume that, if {{maybeSendTransactionalRequest}} returns true, a transactional request has been sent out that should be waited for. However, this is not true if the request requires a coordinator lookup: {{if (nextRequestHandler.needsCoordinator()) {}} {{ targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());}} {{ if (targetNode == null) {}} \{{ transactionManager.lookupCoordinator(nextRequestHandler); \{{ break;}} \{{ }}} {{ ...}} {{lookupCoordinator()}} does not actually send anything, but just enqueues a coordinator lookup request for the {{Sender}}'s next run loop iteration. {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} jumps to a {{return true}} at the end of the method), leading the {{Sender}} to needlessly wait via {{client.poll()}} although there is actually no request in-flight. I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if it merely enqueues the coordinator lookup instead of actually sending anything. But I'm not sure, hence the bug report instead of a pull request. > Unnecessary wait when looking up coordinator before transactional request > - > > Key: KAFKA-8635 > URL: https://issues.apache.org/jira/browse/KAFKA-8635 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.3.0, 2.2.1 >Reporter: Denis Washington >Priority: Major > > In our Kafka Streams applications (with EOS enabled), we were seeing > mysterious long delays between records being produced by a stream task and > the same records being consumed by the next task. These delays turned out to > always be around {{retry.backoff.ms}} long; reducing that value reduced the > delays by about the same amount. > After digging further, I pinned down the problem to the following lines in > {{org.apache.kafka.clients.pro
[jira] [Updated] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request
[ https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Denis Washington updated KAFKA-8635: Description: In our Kafka Streams applications (with EOS enabled), we were seeing mysterious long delays between records being produced by a stream task and the same records being consumed by the next task. These delays turned out to always be around {{retry.backoff.ms}} long; reducing that value reduced the delays by about the same amount. After digging further, I pinned down the problem to the following lines in {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}: {{} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {}} {{ // as long as there are outstanding transactional requests, we simply wait for them to return {{ client.poll(retryBackoffMs, time.milliseconds());}} {{ return;}} {{}}} This code seems to assume that, if {{maybeSendTransactionalRequest}} returns true, a transactional request has been sent out that should be waited for. However, this is not true if the request requires a coordinator lookup: {{if (nextRequestHandler.needsCoordinator()) {}} {{ targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());}} {{ if (targetNode == null) {}} \{{ transactionManager.lookupCoordinator(nextRequestHandler); \{{ break;}} \{{ }}} {{ ...}} {{lookupCoordinator()}} does not actually send anything, but just enqueues a coordinator lookup request for the {{Sender}}'s next run loop iteration. {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} jumps to a {{return true}} at the end of the method), leading the {{Sender}} to needlessly wait via {{client.poll()}} although there is actually no request in-flight. I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if it merely enqueues the coordinator lookup instead of actually sending anything. But I'm not sure, hence the bug report instead of a pull request. was: In our Kafka Streams applications (with EOS enabled), we were seeing mysterious long delays between records being produced by a stream task and the same records being consumed by the next task. These delays turned out to always be around {{retry.backoff.ms}} long; reducing that value reduced the delays by about the same amount. After digging further, I pinned down the problem to the following lines in {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}: {{} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {}} {{ // as long as there are outstanding transactional requests, we simply wait for them to return}} {{ client.poll(retryBackoffMs, time.milliseconds());}} {{ return;}} {{ }}} This code seems to assume that, if {{maybeSendTransactionalRequest}} returns true, a transactional request has been sent out that should be waited for. However, this is not true if the request requires a coordinator lookup: {{if (nextRequestHandler.needsCoordinator()) {}} {{ targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());}} {{ if (targetNode == null) {}} {{ transactionManager.lookupCoordinator(nextRequestHandler); {{ break;}} {{ }}} {{ ...}} {{lookupCoordinator()}} does not actually send anything, but just enqueues a coordinator lookup request for the {{Sender}}'s next run loop iteration. {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} jumps to a {{return true}} at the end of the method), leading the {{Sender}} to needlessly wait via {{client.poll()}} although there is actually no request in-flight. I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if it merely enqueues the coordinator lookup instead of actually sending anything. But I'm not sure, hence the bug report instead of a pull request. > Unnecessary wait when looking up coordinator before transactional request > - > > Key: KAFKA-8635 > URL: https://issues.apache.org/jira/browse/KAFKA-8635 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.3.0, 2.2.1 >Reporter: Denis Washington >Priority: Major > > In our Kafka Streams applications (with EOS enabled), we were seeing > mysterious long delays between records being produced by a stream task and > the same records being consumed by the next task. These delays turned out to > always be around {{retry.backoff.ms}} long; reducing that value reduced the > delays by about the same amount. > After digging further, I pinned down the problem to the following lines in > {{org.apache.kafka.clients.producer.inter
[jira] [Created] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request
Denis Washington created KAFKA-8635: --- Summary: Unnecessary wait when looking up coordinator before transactional request Key: KAFKA-8635 URL: https://issues.apache.org/jira/browse/KAFKA-8635 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.2.1, 2.3.0 Reporter: Denis Washington In our Kafka Streams applications (with EOS enabled), we were seeing mysterious long delays between records being produced by a stream task and the same records being consumed by the next task. These delays turned out to always be around {{retry.backoff.ms}} long; reducing that value reduced the delays by about the same amount. After digging further, I pinned down the problem to the following lines in {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}: {{} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {}} {{ // as long as there are outstanding transactional requests, we simply wait for them to return}} {{ client.poll(retryBackoffMs, time.milliseconds());}} {{ return;}} {{ }}} This code seems to assume that, if {{maybeSendTransactionalRequest}} returns true, a transactional request has been sent out that should be waited for. However, this is not true if the request requires a coordinator lookup: {{if (nextRequestHandler.needsCoordinator()) {}} {{ targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());}} {{ if (targetNode == null) {}} {{ transactionManager.lookupCoordinator(nextRequestHandler); {{ break;}} {{ }}} {{ ...}} {{lookupCoordinator()}} does not actually send anything, but just enqueues a coordinator lookup request for the {{Sender}}'s next run loop iteration. {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} jumps to a {{return true}} at the end of the method), leading the {{Sender}} to needlessly wait via {{client.poll()}} although there is actually no request in-flight. I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if it merely enqueues the coordinator lookup instead of actually sending anything. But I'm not sure, hence the bug report instead of a pull request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)