[jira] [Created] (KAFKA-13680) Kafka Streams application remains in RUNNING state although all stream threads shut down

2022-02-22 Thread Denis Washington (Jira)
Denis Washington created KAFKA-13680:


 Summary: Kafka Streams application remains in RUNNING state 
although all stream threads shut down
 Key: KAFKA-13680
 URL: https://issues.apache.org/jira/browse/KAFKA-13680
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.0.0
Reporter: Denis Washington


We have a Kafka Streams application that is configured with 
{{LogAndFailExceptionHandler}} as deserialization error handler. In the Kafka 
Streams version we used previously (2.7), a deserialization error that causes 
all stream threads to shut down would ultimately move the application to the 
ERROR state.

However, after updating to Kafka Streams 3.0.0, we see a different behavior: 
the stream threads still shut down, but the Kafka Streams application stays in 
the RUNNING state. It  thus gets into a "zombie" state not detected by our 
monitoring.

It may be worth noting that this application has global state stores, and that 
the global stream thread was not affected by the deserialization error where we 
noticed the problem.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Denis Washington (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880227#comment-16880227
 ] 

Denis Washington commented on KAFKA-8630:
-

Looks like the {{InMemoryWindowStore}} needs the extra methods of the 
{{InternalProcessorContext}} interface only for recording metrics. Perhaps it 
could just surround those blocks with {{if (context instanceof 
InternalProcessorContext)}}, but otherwise also accept other 
{{ProcessorContext}} implementations like {{MockProcessorContext}}.

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at 
> 

[jira] [Updated] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request

2019-07-08 Thread Denis Washington (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Denis Washington updated KAFKA-8635:

Description: 
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
 {{ // as long as there are outstanding transactional requests, we simply 
wait for them to return
 {{ client.poll(retryBackoffMs, time.milliseconds());}}
 {{ return;}}
 {{}}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
{{     targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{     if (targetNode == null) {}}
{{          transactionManager.lookupCoordinator(nextRequestHandler); }}
{{  break;}}
{{    }}}
{{     ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.

  was:
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
{{ // as long as there are outstanding transactional requests, we simply 
wait for them to return
{{ client.poll(retryBackoffMs, time.milliseconds());}}
{{ return;}}
{{}}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
 {{    targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
 {{    if (targetNode == null) {}}
 \{{         transactionManager.lookupCoordinator(nextRequestHandler); 
 \{{     break;}}
 \{{    }}}
 {{    ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.


> Unnecessary wait when looking up coordinator before transactional request
> -
>
> Key: KAFKA-8635
> URL: https://issues.apache.org/jira/browse/KAFKA-8635
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Denis Washington
>Priority: Major
>
> In our Kafka Streams applications (with EOS enabled), we were seeing 
> mysterious long delays between records being produced by a stream task and 
> the same records being consumed by the next task. These delays turned out to 
> always be around {{retry.backoff.ms}} long; reducing that value reduced the 
> delays by about the same amount.
> After digging further, I pinned down the problem to the following lines in 
> 

[jira] [Updated] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request

2019-07-08 Thread Denis Washington (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Denis Washington updated KAFKA-8635:

Description: 
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
{{ // as long as there are outstanding transactional requests, we simply 
wait for them to return
{{ client.poll(retryBackoffMs, time.milliseconds());}}
{{ return;}}
{{}}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
 {{    targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
 {{    if (targetNode == null) {}}
 \{{         transactionManager.lookupCoordinator(nextRequestHandler); 
 \{{     break;}}
 \{{    }}}
 {{    ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.

  was:
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
{{  // as long as there are outstanding transactional requests, we simply 
wait for them to return}}
{{ client.poll(retryBackoffMs, time.milliseconds());}}
{{ return;}}
{{ }}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
{{    targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{    if (targetNode == null) {}}
{{         transactionManager.lookupCoordinator(nextRequestHandler); 
{{     break;}}
{{    }}}
{{    ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.


> Unnecessary wait when looking up coordinator before transactional request
> -
>
> Key: KAFKA-8635
> URL: https://issues.apache.org/jira/browse/KAFKA-8635
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Denis Washington
>Priority: Major
>
> In our Kafka Streams applications (with EOS enabled), we were seeing 
> mysterious long delays between records being produced by a stream task and 
> the same records being consumed by the next task. These delays turned out to 
> always be around {{retry.backoff.ms}} long; reducing that value reduced the 
> delays by about the same amount.
> After digging further, I pinned down the problem to the following lines in 
> 

[jira] [Created] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request

2019-07-08 Thread Denis Washington (JIRA)
Denis Washington created KAFKA-8635:
---

 Summary: Unnecessary wait when looking up coordinator before 
transactional request
 Key: KAFKA-8635
 URL: https://issues.apache.org/jira/browse/KAFKA-8635
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.2.1, 2.3.0
Reporter: Denis Washington


In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
{{  // as long as there are outstanding transactional requests, we simply 
wait for them to return}}
{{ client.poll(retryBackoffMs, time.milliseconds());}}
{{ return;}}
{{ }}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
{{    targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{    if (targetNode == null) {}}
{{         transactionManager.lookupCoordinator(nextRequestHandler); 
{{     break;}}
{{    }}}
{{    ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)