[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Justin Fetherolf (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880923#comment-16880923
 ] 

Justin Fetherolf commented on KAFKA-8630:
-

The `ProcessorContext` interface already offers a `metrics()` method, and 
`InternalProcessorContext` for some reason overrides it to return a 
`StreamsMetricsImpl` rather than `StreamsMetrics`.

The bit of poking around in the code I've done this evening it seems to be kind 
of a mixed bag of stores needing just this `metrics()` method (and thus not 
needing to do the cast) and needing the other methods that 
`InternalProcessorContext` offers.

Is the possibly pre-existing KIP you're thinking of 
[KIP-448|https://cwiki.apache.org/confluence/x/SAeZBg]?

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> 

[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-08 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880893#comment-16880893
 ] 

Sophie Blee-Goldman commented on KAFKA-4212:


I agree with Matthias that, if we do want to go this route, we should do so by 
just exposing TTL through rocksdb rather than adding a new kind of StateStore 
as a first-class citizen, to make it clear we are just falling back to 
rocksdb's ttl functionality. The store hierarchy is already complicated enough 
and we should avoid adding layers as much as possible.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8640) Replace OffsetFetch request/response with automated protocol

2019-07-08 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8640:
--

 Summary: Replace OffsetFetch request/response with automated 
protocol
 Key: KAFKA-8640
 URL: https://issues.apache.org/jira/browse/KAFKA-8640
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8630:
---
Comment: was deleted

(was: I agree with Justin. Testing using internal classes should be avoided. 
That is the reason why we added the test-utils package to begin with.

About the testing for the type of the context: this would introduce a 
dependency from the "main" module to the test-utils model – beside that the 
main code should not know anything about the test code, it would also be a 
cyclic dependency.

We need to address this differently (maybe add a new interface) that both the 
internal and mocked context class can implement and that we use for the cast. 
This might require a KIP though.)

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
>   

[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880891#comment-16880891
 ] 

Matthias J. Sax commented on KAFKA-8630:


I agree with Justin. Testing using internal classes should be avoided. That is 
the reason why we added the test-utils package to begin with.

About the testing for the type of the context: this would introduce a 
dependency from the "main" module to the test-utils model – beside that the 
main code should not know anything about the test code, it would also be a 
cyclic dependency.

We need to address this differently (maybe add a new interface) that both the 
internal and mocked context class can implement and that we use for the cast. 
This might require a KIP though.

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> 

[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880886#comment-16880886
 ] 

Sophie Blee-Goldman commented on KAFKA-8630:


You're right, I overlooked that InternalMockProcessorContext is not part of the 
public API and definitely shouldn't be used. In general users are encouraged to 
do their testing through the TopologyTestDriver. You can access state stores 
through it and they will be managed for you so you don't have to worry about 
calling init or close. (This is especially important with the persistent 
variants of the stores as they will leak memory if not properly closed.)

That said, I can see how this would be useful even if not necessary. If you are 
interested in picking this up, I'd be happy to help! Just to outline the 
problem clearly, along with some possible solutions:

This affects all window and session stores, and may affect key-value stores in 
the future should we chose to add metrics.The problem is that the stores cast 
to InternalMockProcessorContext in order to access the  StreamsMetrics, but the 
MockProcessorContext intended for testing does not extend this class. I don't 
think it's ideal to just add "if" guards everywhere this internal context or 
the related metrics/sensors are used in the actual streams code just to make 
room for some test code.

 One possibility would be to have the Internal and Mock processor contexts' 
implement a common interface that has a `metrics()` method, and just return a 
dummy metrics in the Mock case. Alternatively we could add some kind of Mock 
stores that can hide the internals and just present a stable interface for 
testing – either way we would need a KIP. Although I believe there already was 
a KIP regarding mock stores, maybe this could be a part of that

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> 

[jira] [Updated] (KAFKA-8620) Race condition in StreamThread state change

2019-07-08 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8620:
---
Summary: Race condition in StreamThread state change  (was: Fix potential 
race condition in StreamThread state change)

> Race condition in StreamThread state change
> ---
>
> Key: KAFKA-8620
> URL: https://issues.apache.org/jira/browse/KAFKA-8620
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the call to `StreamThread.addRecordsToTasks` we don't have synchronization 
> when we attempt to extract active tasks. If after one long poll in runOnce 
> the application state changes to PENDING_SHUTDOWN, there is a potential close 
> on TaskManager which erases the active tasks map, thus triggering NPE and 
> bringing the thread state to a false shutdown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Justin Fetherolf (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880831#comment-16880831
 ] 

Justin Fetherolf commented on KAFKA-8630:
-

[~ableegoldman] I personally feel that this is still a problem and shouldn't be 
closed.

Trying to find `InternalMockProcessorContext` revealed another class 
`MockInternalProcessorContext`, both of which are in test code and not 
available via the public packages (or at least not in `kafka-streams` or 
`kafka-streams-test-utils` which seems like the logical place for them).

I think the root problem here is not having necessary mock contexts in the 
proper package, and not having the documentation to support their use cases.

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> 

[jira] [Commented] (KAFKA-8615) Change to track partition time breaks TimestampExtractor

2019-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880820#comment-16880820
 ] 

ASF GitHub Bot commented on KAFKA-8615:
---

ableegoldman commented on pull request #7054: KAFKA-8615: Change to track 
partition time breaks TimestampExtractor
URL: https://github.com/apache/kafka/pull/7054
 
 
   We should be setting `headRecord` to null AFTER calling `updateHead()` since 
it calls `timestamp()` for the TimestampExtractor, which returns `UNKNOWN` if 
`head == null`
   
   Should be cherry-picked back to 2.3
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Change to track partition time breaks TimestampExtractor
> 
>
> Key: KAFKA-8615
> URL: https://issues.apache.org/jira/browse/KAFKA-8615
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> From the users mailing list, *UPDATED* by Jonathan Santilli:
> {noformat}
> Am testing the new version 2.3 for Kafka Streams specifically. I have noticed 
> that now, the implementation of the method extract from the
> interface org.apache.kafka.streams.processor.TimestampExtractor:
> public class OwnTimeExtractor implements TimestampExtractor {
>     @Override
>     public long extract(final ConsumerRecord record, final 
> long previousTimestamp) {
>         // previousTimestamp is always == -1. For version 2.3
>     }
> }
> Previous version 2.2.1 was returning the correct value for the record 
> partition.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8639) Replace AddPartitionsToTxn request/response with automated protocol

2019-07-08 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8639:
--

 Summary: Replace AddPartitionsToTxn request/response with 
automated protocol
 Key: KAFKA-8639
 URL: https://issues.apache.org/jira/browse/KAFKA-8639
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8616) Replace ApiVersionsRequest request/response with automated protocol

2019-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880789#comment-16880789
 ] 

ASF GitHub Bot commented on KAFKA-8616:
---

abbccdda commented on pull request #7052: KAFKA-8616: Replace 
ApiVersionsRequest request/response with automated protocol
URL: https://github.com/apache/kafka/pull/7052
 
 
   As title.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace ApiVersionsRequest request/response with automated protocol
> ---
>
> Key: KAFKA-8616
> URL: https://issues.apache.org/jira/browse/KAFKA-8616
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880771#comment-16880771
 ] 

Sophie Blee-Goldman commented on KAFKA-8630:


Yes, you're likely to hit this problem with some of the other store 
implementations as well which also cast to `InternalProcessorContext`. This is 
needed in order to access the metrics. However you should be able to run your 
tests by just using an `InternalMockProcessorContext` instead.

 

[~fetherolfjd] Can I close this as "not a problem" ?

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at 

[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-08 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880761#comment-16880761
 ] 

Matthias J. Sax commented on KAFKA-4212:


\cc [~guozhang] [~bbejeck] WDYT?

[~ctoomey]: just to give a heads up. The current PR is a public API change and 
would require a KIP. Public API changes are always more difficult and I am 
wondering (for the case Guozhang and Bill support the request to ship a best 
effort feature) if an internal change via `RocksDBConfigSetter` might be a 
better approach?

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8026) Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted

2019-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880756#comment-16880756
 ] 

ASF GitHub Bot commented on KAFKA-8026:
---

emschwar commented on pull request #7051: KAFKA-8026: Fix for Flaky 
RegexSourceIntegrationTest
URL: https://github.com/apache/kafka/pull/7051
 
 
   This is a 2.3-idiomatic recreation of Bill Bejeck
   's [original 
patch](https://github.com/nexiahome/kafka/commit/92c591d) for the 1.1 branch, 
that seems to have been inadvertently omitted. I'm not sure if this PR ought to 
be against the 2.3 branch, or trunk; it should apply cleanly to either.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted
> 
>
> Key: KAFKA-8026
> URL: https://issues.apache.org/jira/browse/KAFKA-8026
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.0.2, 1.1.1
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>Priority: Critical
>  Labels: flaky-test
> Fix For: 1.0.3, 1.1.2
>
>
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Stream tasks not updated
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:215){quote}
> Happend in 1.0 and 1.1 builds:
> [https://builds.apache.org/blue/organizations/jenkins/kafka-1.0-jdk7/detail/kafka-1.0-jdk7/263/tests/]
> and
> [https://builds.apache.org/blue/organizations/jenkins/kafka-1.1-jdk7/detail/kafka-1.1-jdk7/249/tests/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8637) WriteBatch objects leak off-heap memory

2019-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880748#comment-16880748
 ] 

ASF GitHub Bot commented on KAFKA-8637:
---

ableegoldman commented on pull request #7050: KAFKA-8637: WriteBatch objects 
leak off-heap memory
URL: https://github.com/apache/kafka/pull/7050
 
 
   Should be cherry-picked back to 2.1  (`batch.close()` should be moved to 
`RocksDBSegmentedBytesStore#restoreAllInternal` for 2.2 and earlier)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WriteBatch objects leak off-heap memory
> ---
>
> Key: KAFKA-8637
> URL: https://issues.apache.org/jira/browse/KAFKA-8637
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> In 2.1 we did some refactoring that led to the WriteBatch objects in 
> RocksDBSegmentedBytesStore#restoreAllInternal being created in a separate 
> method, rather than in a try-with-resources statement as used elsewhere. This 
> causes a memory leak as the WriteBatches are no longer closed automatically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2019-07-08 Thread GEORGE LI (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GEORGE LI updated KAFKA-8638:
-
Description: 
Currently, the kafka preferred leader election will pick the broker_id in the 
topic/partition replica assignments in a priority order when the broker is in 
ISR. The preferred leader is the broker id in the first position of replica. 
There are use-cases that, even the first broker in the replica assignment is in 
ISR, there is a need for it to be moved to the end of ordering (lowest 
priority) when deciding leadership during  preferred leader election. 

Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
leader.  When preferred leadership is run, it will pick 1 as the leader if it's 
ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 
3 as the leader. There are use cases that, even 1 is in ISR, we would like it 
to be moved to the end of ordering (lowest priority) when deciding leadership 
during preferred leader election.   Below is a list of use cases:

* (If broker_id 1 is a swapped failed host and brought up with last segments or 
latest offset without historical data (There is another effort on this), it's 
better for it to not serve leadership till it's caught-up.

* The cross-data center cluster has AWS instances which have less computing 
power than the on-prem bare metal machines.  We could put the AWS broker_ids in 
Preferred Leader Blacklist, so on-prem brokers can be elected leaders, without 
changing the reassignments ordering of the replicas. 

* If the broker_id 1 is constantly losing leadership after some time: 
"Flapping". we would want to exclude 1 to be a leader unless all other brokers 
of this topic/partition are offline.  The “Flapping” effect was seen in the 
past when 2 or more brokers were bad, when they lost leadership 
constantly/quickly, the sets of partition replicas they belong to will see 
leadership constantly changing.  The ultimate solution is to swap these bad 
hosts.  But for quick mitigation, we can also put the bad hosts in the 
Preferred Leader Blacklist to move the priority of its being elected as leaders 
to the lowest. 

*  If the controller is busy serving an extra load of metadata requests and 
other tasks. we would like to put the controller's leaders to other brokers to 
lower its CPU load. currently bouncing to lose leadership would not work for 
Controller, because after the bounce, the controller fails over to another 
broker.

* Avoid bouncing broker in order to lose its leadership: it would be good if we 
have a way to specify which broker should be excluded from serving 
traffic/leadership (without changing the replica assignment ordering by 
reassignments, even though that's quick), and run preferred leader election.  A 
bouncing broker will cause temporary URP, and sometimes other issues.  Also a 
bouncing of broker (e.g. broker_id 1) can temporarily lose all its leadership, 
but if another broker (e.g. broker_id 2) fails or gets bounced, some of its 
leaderships will likely failover to broker_id 1 on a replica with 3 brokers.  
If broker_id 1 is in the blacklist, then in such a scenario even broker_id 2 
offline,  the 3rd broker can take leadership. 


The current work-around of the above is to change the topic/partition's replica 
reassignments to move the broker_id 1 from the first position to the last 
position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). This 
changes the replica reassignments, and we need to keep track of the original 
one and restore if things change (e.g. controller fails over to another broker, 
the swapped empty broker caught up). That’s a rather tedious task.
 

  was:
Currently, the kafka preferred leader election will pick the broker_id in the 
topic/partition replica assignments in a priority order when the broker is in 
ISR. The preferred leader is the broker id in the first position of replica. 
There are use-cases that, even the first broker in the replica assignment is in 
ISR, there is a need for it to be moved to the end of ordering (lowest 
priority) when deciding leadership during  preferred leader election. 

Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
leader.  When preferred leadership is run, it will pick 1 as the leader if it's 
ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 
3 as the leader. There are use cases that, even 1 is in ISR, we would like it 
to be moved to the end of ordering (lowest priority) when deciding leadership 
during preferred leader election.   Below is a list of use cases:

# If broker_id 1 is a swapped failed host and brought up with last segments or 
latest offset without historical data (There is another effort on this), it's 
better for it to not serve leadership till it's caught-up.

# The cross-data center cluster has AWS instances which 

[jira] [Created] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)

2019-07-08 Thread GEORGE LI (JIRA)
GEORGE LI created KAFKA-8638:


 Summary: Preferred Leader Blacklist (deprioritized list)
 Key: KAFKA-8638
 URL: https://issues.apache.org/jira/browse/KAFKA-8638
 Project: Kafka
  Issue Type: Improvement
  Components: config, controller, core
Affects Versions: 2.2.1, 2.3.0, 1.1.1
Reporter: GEORGE LI
Assignee: GEORGE LI


Currently, the kafka preferred leader election will pick the broker_id in the 
topic/partition replica assignments in a priority order when the broker is in 
ISR. The preferred leader is the broker id in the first position of replica. 
There are use-cases that, even the first broker in the replica assignment is in 
ISR, there is a need for it to be moved to the end of ordering (lowest 
priority) when deciding leadership during  preferred leader election. 

Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred 
leader.  When preferred leadership is run, it will pick 1 as the leader if it's 
ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 
3 as the leader. There are use cases that, even 1 is in ISR, we would like it 
to be moved to the end of ordering (lowest priority) when deciding leadership 
during preferred leader election.   Below is a list of use cases:

# If broker_id 1 is a swapped failed host and brought up with last segments or 
latest offset without historical data (There is another effort on this), it's 
better for it to not serve leadership till it's caught-up.

# The cross-data center cluster has AWS instances which have less computing 
power than the on-prem bare metal machines.  We could put the AWS broker_ids in 
Preferred Leader Blacklist, so on-prem brokers can be elected leaders, without 
changing the reassignments ordering of the replicas. 

# If the broker_id 1 is constantly losing leadership after some time: 
"Flapping". we would want to exclude 1 to be a leader unless all other brokers 
of this topic/partition are offline.  The “Flapping” effect was seen in the 
past when 2 or more brokers were bad, when they lost leadership 
constantly/quickly, the sets of partition replicas they belong to will see 
leadership constantly changing.  The ultimate solution is to swap these bad 
hosts.  But for quick mitigation, we can also put the bad hosts in the 
Preferred Leader Blacklist to move the priority of its being elected as leaders 
to the lowest. 

#  If the controller is busy serving an extra load of metadata requests and 
other tasks. we would like to put the controller's leaders to other brokers to 
lower its CPU load. currently bouncing to lose leadership would not work for 
Controller, because after the bounce, the controller fails over to another 
broker.

# Avoid bouncing broker in order to lose its leadership: it would be good if we 
have a way to specify which broker should be excluded from serving 
traffic/leadership (without changing the replica assignment ordering by 
reassignments, even though that's quick), and run preferred leader election.  A 
bouncing broker will cause temporary URP, and sometimes other issues.  Also a 
bouncing of broker (e.g. broker_id 1) can temporarily lose all its leadership, 
but if another broker (e.g. broker_id 2) fails or gets bounced, some of its 
leaderships will likely failover to broker_id 1 on a replica with 3 brokers.  
If broker_id 1 is in the blacklist, then in such a scenario even broker_id 2 
offline,  the 3rd broker can take leadership. 


The current work-around of the above is to change the topic/partition's replica 
reassignments to move the broker_id 1 from the first position to the last 
position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). This 
changes the replica reassignments, and we need to keep track of the original 
one and restore if things change (e.g. controller fails over to another broker, 
the swapped empty broker caught up). That’s a rather tedious task.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams

2019-07-08 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880745#comment-16880745
 ] 

Sophie Blee-Goldman commented on KAFKA-8367:


Hey [~pavelsavov]! Sorry for the long silence. I believe we've tracked down 
another memory leak, which if you're able to test it out can be found here: 
[https://github.com/apache/kafka/pull/7049]

It may not be the only leak, as it affects 2.1 as well as 2.2, but we believe 
it's still an important fix. If this does not help we'll keep digging

> Non-heap memory leak in Kafka Streams
> -
>
> Key: KAFKA-8367
> URL: https://issues.apache.org/jira/browse/KAFKA-8367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Pavel Savov
>Priority: Major
> Attachments: memory-prod.png, memory-test.png
>
>
> We have been observing a non-heap memory leak after upgrading to Kafka 
> Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the 
> leak only happens when we enable stateful stream operations (utilizing 
> stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 
> and ported the fix scheduled for release in 2.2.1 to our fork. It did not 
> stop the leak, however.
> We are having this memory leak in our production environment where the 
> consumer group is auto-scaled in and out in response to changes in traffic 
> volume, and in our test environment where we have two consumers, no 
> autoscaling and relatively constant traffic.
> Below is some information I'm hoping will be of help:
>  * RocksDB Config:
> Block cache size: 4 MiB
> Write buffer size: 2 MiB
> Block size: 16 KiB
> Cache index and filter blocks: true
> Manifest preallocation size: 64 KiB
> Max write buffer number: 3
> Max open files: 6144
>  
>  * Memory usage in production
> The attached graph (memory-prod.png) shows memory consumption for each 
> instance as a separate line. The horizontal red line at 6 GiB is the memory 
> limit.
> As illustrated on the attached graph from production, memory consumption in 
> running instances goes up around autoscaling events (scaling the consumer 
> group either in or out) and associated rebalancing. It stabilizes until the 
> next autoscaling event but it never goes back down.
> An example of scaling out can be seen from around 21:00 hrs where three new 
> instances are started in response to a traffic spike.
> Just after midnight traffic drops and some instances are shut down. Memory 
> consumption in the remaining running instances goes up.
> Memory consumption climbs again from around 6:00AM due to increased traffic 
> and new instances are being started until around 10:30AM. Memory consumption 
> never drops until the cluster is restarted around 12:30.
>  
>  * Memory usage in test
> As illustrated by the attached graph (memory-test.png) we have a fixed number 
> of two instances in our test environment and no autoscaling. Memory 
> consumption rises linearly until it reaches the limit (around 2:00 AM on 
> 5/13) and Mesos restarts the offending instances, or we restart the cluster 
> manually.
>  
>  * No heap leaks observed
>  * Window retention: 2 or 11 minutes (depending on operation type)
>  * Issue not present in Kafka Streams 2.0.1
>  * No memory leak for stateless stream operations (when no RocksDB stores are 
> used)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8637) WriteBatch objects leak off-heap memory

2019-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880729#comment-16880729
 ] 

ASF GitHub Bot commented on KAFKA-8637:
---

ableegoldman commented on pull request #7049: KAFKA-8637: WriteBatch objects 
leak off-heap memory
URL: https://github.com/apache/kafka/pull/7049
 
 
   Should be cherry-picked back to 2.1
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WriteBatch objects leak off-heap memory
> ---
>
> Key: KAFKA-8637
> URL: https://issues.apache.org/jira/browse/KAFKA-8637
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> In 2.1 we did some refactoring that led to the WriteBatch objects in 
> RocksDBSegmentedBytesStore#restoreAllInternal being created in a separate 
> method, rather than in a try-with-resources statement as used elsewhere. This 
> causes a memory leak as the WriteBatches are no longer closed automatically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6771) Make specifying partitions in RoundTripWorkload, ProduceBench more flexible

2019-07-08 Thread Colin P. McCabe (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe resolved KAFKA-6771.

Resolution: Fixed

> Make specifying partitions in RoundTripWorkload, ProduceBench more flexible
> ---
>
> Key: KAFKA-6771
> URL: https://issues.apache.org/jira/browse/KAFKA-6771
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> Make specifying partitions in RoundTripWorkload, ProduceBench more flexible



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6263) Expose metric for group metadata loading duration

2019-07-08 Thread Anastasia Vela (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anastasia Vela updated KAFKA-6263:
--
Component/s: core

> Expose metric for group metadata loading duration
> -
>
> Key: KAFKA-6263
> URL: https://issues.apache.org/jira/browse/KAFKA-6263
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jason Gustafson
>Assignee: Anastasia Vela
>Priority: Major
>
> We have seen in several cases where the log cleaner either wasn't enabled or 
> had experienced some failure that __consumer_offsets partitions can grow 
> excessively. When one of these partitions changes leadership, the new 
> coordinator must load the offset cache from the start of the log, which can 
> take arbitrarily long depending on how large the partition has grown (we have 
> seen cases where it took hours). Catching this problem is not always easy 
> because the condition is rare and the symptom just tends to be a long period 
> of inactivity in the consumer group which gradually gets worse over time. It 
> may therefore be useful to have a broker metric for the load time so that it 
> can be monitored and potentially alerted on. Same thing goes for the 
> transaction log 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6263) Expose metric for group metadata loading duration

2019-07-08 Thread Anastasia Vela (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anastasia Vela updated KAFKA-6263:
--
Labels:   (was: needs-kip)

> Expose metric for group metadata loading duration
> -
>
> Key: KAFKA-6263
> URL: https://issues.apache.org/jira/browse/KAFKA-6263
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Anastasia Vela
>Priority: Major
>
> We have seen in several cases where the log cleaner either wasn't enabled or 
> had experienced some failure that __consumer_offsets partitions can grow 
> excessively. When one of these partitions changes leadership, the new 
> coordinator must load the offset cache from the start of the log, which can 
> take arbitrarily long depending on how large the partition has grown (we have 
> seen cases where it took hours). Catching this problem is not always easy 
> because the condition is rare and the symptom just tends to be a long period 
> of inactivity in the consumer group which gradually gets worse over time. It 
> may therefore be useful to have a broker metric for the load time so that it 
> can be monitored and potentially alerted on. Same thing goes for the 
> transaction log 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8629) Kafka Streams Apps to support small native images through GraalVM

2019-07-08 Thread Andy Muir (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880711#comment-16880711
 ] 

Andy Muir commented on KAFKA-8629:
--

I've blogged about my efforts so far here: 
[https://muirandy.wordpress.com/2019/07/08/kafka-streaming-on-graalvm/]

The following findings might be of interest - after all the goal was to reduce 
the required resources of a small Kafka Streams app:

 
|*Running on Mac*|
|| ||Arguments||Memory Usage||Physical Footprint||CPU Usage||
|JVM|-Xmx48m|Real: 370MB; Private: 337MB; Shared: 25MB|343M|0.6%->3.7%|
|GraalVM Native Image|-Xmx48m|Real: 22MB; Private: 8MB; Shared: 1MB|10M|0.4%|

 
|*Running on Docker*|
|| ||Arguments||Docker Image Size||Memory Usage||CPU Usage||
|JVM|-Xmx48m|114MB|73MiB|1.5%->6.8%|
|GraalVM Native Image|-Xmx48m|32.5MB|8MiB|1.5%|

 

 

 

> Kafka Streams Apps to support small native images through GraalVM
> -
>
> Key: KAFKA-8629
> URL: https://issues.apache.org/jira/browse/KAFKA-8629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
> Environment: OSX
> Linux on Docker
>Reporter: Andy Muir
>Priority: Minor
>
> I'm investigating using [GraalVM|http://example.com/] to help with reducing 
> docker image size and required resources for a simple Kafka Streams 
> microservice. To this end, I'm looking at running a microservice which:
> 1) consumes from a Kafka topic (XML)
> 2) Transforms into JSON
> 3) Produces to a new Kafka topic.
> The Kafka Streams app running in the JVM works fine.
> When I attempt to build it to a GraalVM native image (binary executable which 
> does not require the JVM, hence smaller image size and less resources), I 
> encountered a few 
> [incompatibilities|https://github.com/oracle/graal/blob/master/substratevm/LIMITATIONS.md]
>  with the source code in Kafka.
> I've implemented a workaround for each of these in a fork (link to follow) to 
> help establish if it is feasible. I don't intend (at this stage) for the 
> changes to be applied to the broker - I'm only after Kafka Streams for now. 
> I'm not sure whether it'd be a good idea for the broker itself to run as a 
> native image!
> There were 2 issues getting the native image with kafka streams:
> 1) Some Reflection use cases using MethodHandle
> 2) Anything JMX
> To work around these issues, I have:
> 1) Replaced use of MethodHandle with alternatives
> 2) Commented out the JMX code in a few places
> While the first may be sustainable, I'd expect that the 2nd option should be 
> put behind a configuration switch to allow the existing code to be used by 
> default and turning off JMX if configured.
> *I haven't created a PR for now, as I'd like feedback to decide if it is 
> going to be feasible to carry this forwards.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8637) WriteBatch objects leak off-heap memory

2019-07-08 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8637:
---
Description: In 2.1 we did some refactoring that led to the WriteBatch 
objects in RocksDBSegmentedBytesStore#restoreAllInternal being created in a 
separate method, rather than in a try-with-resources statement as used 
elsewhere. This causes a memory leak as the WriteBatches are no longer closed 
automatically  (was: In 2.1 we did some refactoring that led to the WriteBatch 
objects in #restoreAllInternal being created in a separate method, rather than 
in a try-with-resources statement. This causes a memory leak as the WriteBatchs 
are no longer closed automatically)

> WriteBatch objects leak off-heap memory
> ---
>
> Key: KAFKA-8637
> URL: https://issues.apache.org/jira/browse/KAFKA-8637
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> In 2.1 we did some refactoring that led to the WriteBatch objects in 
> RocksDBSegmentedBytesStore#restoreAllInternal being created in a separate 
> method, rather than in a try-with-resources statement as used elsewhere. This 
> causes a memory leak as the WriteBatches are no longer closed automatically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8637) WriteBatch objects leak off-heap memory

2019-07-08 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8637:
--

 Summary: WriteBatch objects leak off-heap memory
 Key: KAFKA-8637
 URL: https://issues.apache.org/jira/browse/KAFKA-8637
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.0
Reporter: Sophie Blee-Goldman


In 2.1 we did some refactoring that led to the WriteBatch objects in 
#restoreAllInternal being created in a separate method, rather than in a 
try-with-resources statement. This causes a memory leak as the WriteBatchs are 
no longer closed automatically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8636) Document behavior change for static members with `max.poll.interval.ms`

2019-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880642#comment-16880642
 ] 

ASF GitHub Bot commented on KAFKA-8636:
---

abbccdda commented on pull request #7048: KAFKA-8636: add documentation change 
for max poll interval with static members
URL: https://github.com/apache/kafka/pull/7048
 
 
   As title, static members' behavior under `max.poll.interval.ms` is slightly 
different from dynamic members.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document behavior change for static members with `max.poll.interval.ms`
> ---
>
> Key: KAFKA-8636
> URL: https://issues.apache.org/jira/browse/KAFKA-8636
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> The static member's behavior with max poll interval is potentially different 
> from current config documents. For example, if session timeout >> max poll 
> interval, static members will not leave the group until session timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8636) Document behavior change for static members with `max.poll.interval.ms`

2019-07-08 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8636:
--

 Summary: Document behavior change for static members with 
`max.poll.interval.ms`
 Key: KAFKA-8636
 URL: https://issues.apache.org/jira/browse/KAFKA-8636
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen
Assignee: Boyang Chen


The static member's behavior with max poll interval is potentially different 
from current config documents. For example, if session timeout >> max poll 
interval, static members will not leave the group until session timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6263) Expose metric for group metadata loading duration

2019-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880501#comment-16880501
 ] 

ASF GitHub Bot commented on KAFKA-6263:
---

anatasiavela commented on pull request #7045: KAFKA-6263: Expose metrics for 
group and transaction metadata loading duration
URL: https://github.com/apache/kafka/pull/7045
 
 
   [JIRA](https://issues.apache.org/jira/browse/KAFKA-6263)
   
   - Add metrics to provide visibility for how long group metadata and 
transaction metadata take to load in order to understand some inactivity seen 
in the consumer groups
   - Tests include mocking load times by creating a delay after each are loaded 
and ensuring the measured JMX metric is as it should be
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Expose metric for group metadata loading duration
> -
>
> Key: KAFKA-6263
> URL: https://issues.apache.org/jira/browse/KAFKA-6263
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Anastasia Vela
>Priority: Major
>  Labels: needs-kip
>
> We have seen in several cases where the log cleaner either wasn't enabled or 
> had experienced some failure that __consumer_offsets partitions can grow 
> excessively. When one of these partitions changes leadership, the new 
> coordinator must load the offset cache from the start of the log, which can 
> take arbitrarily long depending on how large the partition has grown (we have 
> seen cases where it took hours). Catching this problem is not always easy 
> because the condition is rare and the symptom just tends to be a long period 
> of inactivity in the consumer group which gradually gets worse over time. It 
> may therefore be useful to have a broker metric for the load time so that it 
> can be monitored and potentially alerted on. Same thing goes for the 
> transaction log 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request

2019-07-08 Thread Bob Barrett (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Barrett reassigned KAFKA-8635:
--

Assignee: Bob Barrett

> Unnecessary wait when looking up coordinator before transactional request
> -
>
> Key: KAFKA-8635
> URL: https://issues.apache.org/jira/browse/KAFKA-8635
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Denis Washington
>Assignee: Bob Barrett
>Priority: Major
>
> In our Kafka Streams applications (with EOS enabled), we were seeing 
> mysterious long delays between records being produced by a stream task and 
> the same records being consumed by the next task. These delays turned out to 
> always be around {{retry.backoff.ms}} long; reducing that value reduced the 
> delays by about the same amount.
> After digging further, I pinned down the problem to the following lines in 
> {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:
> {{} else if (transactionManager.hasInFlightTransactionalRequest() || 
> maybeSendTransactionalRequest()) {}}
>  {{ // as long as there are outstanding transactional requests, we simply 
> wait for them to return
>  {{ client.poll(retryBackoffMs, time.milliseconds());}}
>  {{ return;}}
>  {{}}}
> This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
> true, a transactional request has been sent out that should be waited for. 
> However, this is not true if the request requires a coordinator lookup:
> {{if (nextRequestHandler.needsCoordinator()) {}}
> {{     targetNode = 
> transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
> {{     if (targetNode == null) {}}
> {{          transactionManager.lookupCoordinator(nextRequestHandler); }}
> {{  break;}}
> {{    }}}
> {{     ...}}
> {{lookupCoordinator()}} does not actually send anything, but just enqueues a 
> coordinator lookup request for the {{Sender}}'s next run loop iteration. 
> {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
> jumps to a {{return true}} at the end of the method), leading the {{Sender}} 
> to needlessly wait via {{client.poll()}} although there is actually no 
> request in-flight.
> I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
> it merely enqueues the coordinator lookup instead of actually sending 
> anything. But I'm not sure, hence the bug report instead of a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-07-08 Thread Di Campo (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880482#comment-16880482
 ] 

Di Campo commented on KAFKA-5998:
-

Thanks Patrik.
So, if for some reason an application is stopped for longer than 10 minutes, 
does it mean that on restart, this cleaner may clean that previous state, or is 
this prevented?

Also - if a task is moved to a partition - is the state copied from the 
instances where that task was(if there was any), or is it rebuilt reading from 
the Kafka topics?

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: Bill Bejeck
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, 
> exc.txt, props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  

[jira] [Comment Edited] (KAFKA-8624) 这边是服务端的kafka,版本是 kafka_2.11-1.0.0.2,当客户端向主题发送消息的时候抛出如下异常,客户端跟服务端的版本不一致吗?

2019-07-08 Thread 李志涛


[ 
https://issues.apache.org/jira/browse/KAFKA-8624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878387#comment-16878387
 ] 

Zhitao Li(李志涛) edited comment on KAFKA-8624 at 7/8/19 3:35 PM:
---

!image-2019-07-04-14-56-24-216.png|width=560,height=380!

There are two ways to solve this problem:
 One the hig producer version degrade to 0.9.x,another the low consumer version 
upgrade to 1.0.x which enable it supported to 'MAGIC_VALUE_V2' of 'RecordBatch'


was (Author: lizhitao):
!image-2019-07-04-14-56-24-216.png|width=560,height=380!

There are two ways to solve this problem:
 One the hig producer version degrade to 0.9.x,another the low consumer version 
upgrade to 1.0.x which enabled it supported to 'MAGIC_VALUE_V2' of 'RecordBatch'

> 这边是服务端的kafka,版本是 kafka_2.11-1.0.0.2,当客户端向主题发送消息的时候抛出如下异常,客户端跟服务端的版本不一致吗?
> 
>
> Key: KAFKA-8624
> URL: https://issues.apache.org/jira/browse/KAFKA-8624
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 1.0.0
>Reporter: CHARELS
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.1.0
>
> Attachments: image-2019-07-04-00-18-15-781.png, 
> image-2019-07-04-14-49-47-802.png, image-2019-07-04-14-56-24-216.png, 李志涛.jpg
>
>   Original Estimate: 12h
>  Remaining Estimate: 12h
>
> ERROR [KafkaApi-1004] Error when handling request 
> \{replica_id=-1,max_wait_time=1,min_bytes=0,topics=[{topic=ETE,partitions=[{partition=3,fetch_offset=119224671,max_bytes=1048576}]}]}
>  (kafka.server.KafkaApis)
> java.lang.IllegalArgumentException: Magic v0 does not support record headers
>  at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403)
>  at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)
>  at 
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)
>  at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)
>  at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:519)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:517)
>  at scala.Option.map(Option.scala:146)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:517)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:507)
>  at scala.Option.flatMap(Option.scala:171)
>  at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:507)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:554)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:554)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:568)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:568)
>  at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2033)
>  at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
>  at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2032)
>  at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:568)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:587)
>  at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
>  at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:586)
>  at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:603)
>  at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:603)
>  at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
>  at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:595)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:99)
>  at 

[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Denis Washington (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880227#comment-16880227
 ] 

Denis Washington commented on KAFKA-8630:
-

Looks like the {{InMemoryWindowStore}} needs the extra methods of the 
{{InternalProcessorContext}} interface only for recording metrics. Perhaps it 
could just surround those blocks with {{if (context instanceof 
InternalProcessorContext)}}, but otherwise also accept other 
{{ProcessorContext}} implementations like {{MockProcessorContext}}.

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at 
> 

[jira] [Commented] (KAFKA-8345) Create an Administrative API for Replica Reassignment

2019-07-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880195#comment-16880195
 ] 

ASF GitHub Bot commented on KAFKA-8345:
---

stanislavkozlovski commented on pull request #7041: KAFKA-8345: Add an Admin 
API for partition reassignment (KIP-455)
URL: https://github.com/apache/kafka/pull/7041
 
 
   This PR implements the changes needed for [KIP-455: Create an Administrative 
API for Replica 
Reassignment](https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment)
   
   The changes here are heavily influenced by 
https://github.com/apache/kafka/pull/6955. We synced offline with Colin and 
decided I'll continue the work here.
   There was some missing stuff from the existing commits there, so I'll be 
re-creating them, instead of cherry-picking.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Create an Administrative API for Replica Reassignment
> -
>
> Key: KAFKA-8345
> URL: https://issues.apache.org/jira/browse/KAFKA-8345
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> Create an Administrative API for Replica Reassignment, as discussed in KIP-455



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request

2019-07-08 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880162#comment-16880162
 ] 

Ismael Juma commented on KAFKA-8635:


Nice investigation, thanks for the report. cc [~hachikuji] [~bob-barrett]

> Unnecessary wait when looking up coordinator before transactional request
> -
>
> Key: KAFKA-8635
> URL: https://issues.apache.org/jira/browse/KAFKA-8635
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Denis Washington
>Priority: Major
>
> In our Kafka Streams applications (with EOS enabled), we were seeing 
> mysterious long delays between records being produced by a stream task and 
> the same records being consumed by the next task. These delays turned out to 
> always be around {{retry.backoff.ms}} long; reducing that value reduced the 
> delays by about the same amount.
> After digging further, I pinned down the problem to the following lines in 
> {{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:
> {{} else if (transactionManager.hasInFlightTransactionalRequest() || 
> maybeSendTransactionalRequest()) {}}
>  {{ // as long as there are outstanding transactional requests, we simply 
> wait for them to return
>  {{ client.poll(retryBackoffMs, time.milliseconds());}}
>  {{ return;}}
>  {{}}}
> This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
> true, a transactional request has been sent out that should be waited for. 
> However, this is not true if the request requires a coordinator lookup:
> {{if (nextRequestHandler.needsCoordinator()) {}}
> {{     targetNode = 
> transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
> {{     if (targetNode == null) {}}
> {{          transactionManager.lookupCoordinator(nextRequestHandler); }}
> {{  break;}}
> {{    }}}
> {{     ...}}
> {{lookupCoordinator()}} does not actually send anything, but just enqueues a 
> coordinator lookup request for the {{Sender}}'s next run loop iteration. 
> {{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
> jumps to a {{return true}} at the end of the method), leading the {{Sender}} 
> to needlessly wait via {{client.poll()}} although there is actually no 
> request in-flight.
> I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
> it merely enqueues the coordinator lookup instead of actually sending 
> anything. But I'm not sure, hence the bug report instead of a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request

2019-07-08 Thread Denis Washington (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Denis Washington updated KAFKA-8635:

Description: 
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
 {{ // as long as there are outstanding transactional requests, we simply 
wait for them to return
 {{ client.poll(retryBackoffMs, time.milliseconds());}}
 {{ return;}}
 {{}}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
{{     targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{     if (targetNode == null) {}}
{{          transactionManager.lookupCoordinator(nextRequestHandler); }}
{{  break;}}
{{    }}}
{{     ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.

  was:
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
{{ // as long as there are outstanding transactional requests, we simply 
wait for them to return
{{ client.poll(retryBackoffMs, time.milliseconds());}}
{{ return;}}
{{}}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
 {{    targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
 {{    if (targetNode == null) {}}
 \{{         transactionManager.lookupCoordinator(nextRequestHandler); 
 \{{     break;}}
 \{{    }}}
 {{    ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.


> Unnecessary wait when looking up coordinator before transactional request
> -
>
> Key: KAFKA-8635
> URL: https://issues.apache.org/jira/browse/KAFKA-8635
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Denis Washington
>Priority: Major
>
> In our Kafka Streams applications (with EOS enabled), we were seeing 
> mysterious long delays between records being produced by a stream task and 
> the same records being consumed by the next task. These delays turned out to 
> always be around {{retry.backoff.ms}} long; reducing that value reduced the 
> delays by about the same amount.
> After digging further, I pinned down the problem to the following lines in 
> 

[jira] [Updated] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request

2019-07-08 Thread Denis Washington (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Denis Washington updated KAFKA-8635:

Description: 
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
{{ // as long as there are outstanding transactional requests, we simply 
wait for them to return
{{ client.poll(retryBackoffMs, time.milliseconds());}}
{{ return;}}
{{}}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
 {{    targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
 {{    if (targetNode == null) {}}
 \{{         transactionManager.lookupCoordinator(nextRequestHandler); 
 \{{     break;}}
 \{{    }}}
 {{    ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.

  was:
In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
{{  // as long as there are outstanding transactional requests, we simply 
wait for them to return}}
{{ client.poll(retryBackoffMs, time.milliseconds());}}
{{ return;}}
{{ }}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
{{    targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{    if (targetNode == null) {}}
{{         transactionManager.lookupCoordinator(nextRequestHandler); 
{{     break;}}
{{    }}}
{{    ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.


> Unnecessary wait when looking up coordinator before transactional request
> -
>
> Key: KAFKA-8635
> URL: https://issues.apache.org/jira/browse/KAFKA-8635
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Denis Washington
>Priority: Major
>
> In our Kafka Streams applications (with EOS enabled), we were seeing 
> mysterious long delays between records being produced by a stream task and 
> the same records being consumed by the next task. These delays turned out to 
> always be around {{retry.backoff.ms}} long; reducing that value reduced the 
> delays by about the same amount.
> After digging further, I pinned down the problem to the following lines in 
> 

[jira] [Created] (KAFKA-8635) Unnecessary wait when looking up coordinator before transactional request

2019-07-08 Thread Denis Washington (JIRA)
Denis Washington created KAFKA-8635:
---

 Summary: Unnecessary wait when looking up coordinator before 
transactional request
 Key: KAFKA-8635
 URL: https://issues.apache.org/jira/browse/KAFKA-8635
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.2.1, 2.3.0
Reporter: Denis Washington


In our Kafka Streams applications (with EOS enabled), we were seeing mysterious 
long delays between records being produced by a stream task and the same 
records being consumed by the next task. These delays turned out to always be 
around {{retry.backoff.ms}} long; reducing that value reduced the delays by 
about the same amount.

After digging further, I pinned down the problem to the following lines in 
{{org.apache.kafka.clients.producer.internals.Sender#runOnce}}:

{{} else if (transactionManager.hasInFlightTransactionalRequest() || 
maybeSendTransactionalRequest()) {}}
{{  // as long as there are outstanding transactional requests, we simply 
wait for them to return}}
{{ client.poll(retryBackoffMs, time.milliseconds());}}
{{ return;}}
{{ }}}

This code seems to assume that, if {{maybeSendTransactionalRequest}} returns 
true, a transactional request has been sent out that should be waited for. 
However, this is not true if the request requires a coordinator lookup:

{{if (nextRequestHandler.needsCoordinator()) {}}
{{    targetNode = 
transactionManager.coordinator(nextRequestHandler.coordinatorType());}}
{{    if (targetNode == null) {}}
{{         transactionManager.lookupCoordinator(nextRequestHandler); 
{{     break;}}
{{    }}}
{{    ...}}

{{lookupCoordinator()}} does not actually send anything, but just enqueues a 
coordinator lookup request for the {{Sender}}'s next run loop iteration. 
{{maybeSendTransactionalRequest}} still returns true, though (the {{break}} 
jumps to a {{return true}} at the end of the method), leading the {{Sender}} to 
needlessly wait via {{client.poll()}} although there is actually no request 
in-flight.

I _think_ the fix is to let {{maybeSendTransactionalRequest}} return false if 
it merely enqueues the coordinator lookup instead of actually sending anything. 
But I'm not sure, hence the bug report instead of a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8634) Update ZooKeeper to 3.5.5

2019-07-08 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-8634:
---
Fix Version/s: 2.4.0

> Update ZooKeeper to 3.5.5
> -
>
> Key: KAFKA-8634
> URL: https://issues.apache.org/jira/browse/KAFKA-8634
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.4.0
>
>
> ZooKeeper 3.5.5 is the first stable release in the 3.5.x series. The key new 
> feature in ZK 3.5.x is TLS support.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7178) Is kafka compatible with zookeeper 3.5.x ?

2019-07-08 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-7178.

Resolution: Duplicate

KAFKA-8634 tracks updating to ZK 3.5.x.

> Is kafka compatible with zookeeper 3.5.x ?
> --
>
> Key: KAFKA-7178
> URL: https://issues.apache.org/jira/browse/KAFKA-7178
> Project: Kafka
>  Issue Type: Improvement
>  Components: zkclient
>Reporter: hackerwin7
>Priority: Major
>
> Hi, all
> I want to know is kafka versions (0.8.x, 0.9.x, 0.10.x 0.11.x 1.x) compatible 
> with zookeeper 3.5.x with dynamic reconfiguration feature?
> some refs on here: 
> https://serverfault.com/questions/854650/kafka-compatible-with-zookeeper-3-5-feature-rebalancing-client-connections



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8634) Update ZooKeeper to 3.5.5

2019-07-08 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-8634:
--

 Summary: Update ZooKeeper to 3.5.5
 Key: KAFKA-8634
 URL: https://issues.apache.org/jira/browse/KAFKA-8634
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma


ZooKeeper 3.5.5 is the first stable release in the 3.5.x series. The key new 
feature in ZK 3.5.x is TLS support.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-07-08 Thread Sebastjanas Vaisovas (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880134#comment-16880134
 ] 

Sebastjanas Vaisovas commented on KAFKA-7447:
-

I experience the same on 2.0.0. Any plans to fix this?

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] 
> __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous 
> Leader Epoch was: 77 (kafka.cluster.Partition)
>  [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,288] INFO [GroupMetadataManager brokerId=1] Finished 
> loading offsets and group 

[jira] [Commented] (KAFKA-7563) Single broker sends incorrect metadata for topic partitions

2019-07-08 Thread Martin Kamp Jensen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880079#comment-16880079
 ] 

Martin Kamp Jensen commented on KAFKA-7563:
---

After upgrading to 2.3.0 clients (and currently 2.2.1 broker) we have not 
reproduced this error in a couple of 100 runs.

> Single broker sends incorrect metadata for topic partitions
> ---
>
> Key: KAFKA-7563
> URL: https://issues.apache.org/jira/browse/KAFKA-7563
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Martin Kamp Jensen
>Priority: Major
> Attachments: kafka.log, zookeeper.log
>
>
> When starting our Kafka Streams application in a test setup with just one 
> Kafka broker we are seeing the following error roughly 1 out of 15 runs:
> {{StreamsException: Existing internal topic 
> alarm-message-streams-alarm-from-unknown-asset-changelog has invalid 
> partitions: expected: 32; actual: 25. Use 'kafka.tools.StreamsResetter' tool 
> to clean up invalid topics before processing.}}
> (Note: It is not always the same topic that causes the error.)
> When we see the error above the actual number of partitions varies (expected 
> is 32, actual is above 0 and below 32).
> Before each test run the Kafka broker is started without data (using 
> [https://hub.docker.com/r/wurstmeister/kafka/]).
> We have never seen this happen in non-test where we are running with 6 Kafka 
> brokers. However, we are running a significantly higher number of test runs 
> than deploys to non-test.
> After some investigation (including using AdminClient to describe the topics 
> when the Kafka Streams application got the StreamsException and confirming 
> that AdminClient also reports that a topic has the wrong number of 
> partitions!) we implemented the following workaround: When the Kafka Streams 
> application fails with the exception, we stop the application, stop the Kafka 
> broker, start the Kafka broker, and finally start the application. Then the 
> exception is not thrown. Of course this does not explain or fix the real 
> issue at hand but it is still important because we all hate flaky tests.
> Kafka and ZooKeeper log files from a run where the exception above occurred 
> and where applying the workaround described above enabled us to continue 
> without the exception are attached.
> This issue was created by request of Matthias J. Sax at 
> https://stackoverflow.com/questions/52943653/existing-internal-topic-has-invalid-partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2019-07-08 Thread Chris Toomey (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880077#comment-16880077
 ] 

Chris Toomey commented on KAFKA-4212:
-

[~mjsax] there are at least 3 different teams (I'm working with James) that 
need this simple TTL feature for when KTables are being used as topic caches 
and for which the provided solution is fine.

There may be other use cases as you suggest that this solution isn't the right 
fit, and that's fine, but we can't wait around for somebody to figure our and 
implement the perfect solution for all those use cases plus the one we have 
that this solves.

So let's please agree to accept this change for the good of all the developers 
that share our common use case, document the simple mechanics of how it works, 
and you can add in a bunch of caveats of your use cases that it doesn't 
address. Perhaps then you would like to open a different ticket to address 
those use cases.

thanks.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)