[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-26 Thread Sean Guo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146776#comment-17146776
 ] 

Sean Guo commented on KAFKA-10134:
--

We can try the patch next week to see whether this also addresses the issue we 
met as we had also seen a lots of operations on finding the coordinator when 
this issue happens.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8925: KAFKA-9974: Make produce-sync flush

2020-06-26 Thread GitBox


guozhangwang commented on pull request #8925:
URL: https://github.com/apache/kafka/pull/8925#issuecomment-650487412


   > Could we include the purpose of this PR in the title, such as "Integration 
test shouldApplyUpdatesToStandbyStore fix:... "
   
   SG



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8925: KAFKA-9974: Make produce-sync flush

2020-06-26 Thread GitBox


abbccdda commented on pull request #8925:
URL: https://github.com/apache/kafka/pull/8925#issuecomment-650486850


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650483907


   backported to 2.6, 2.5, and 2.4. I ran the streams and client tests each 
time, as well as systemTestLibs.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10173) BufferUnderflowException during Kafka Streams Upgrade

2020-06-26 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146758#comment-17146758
 ] 

John Roesler commented on KAFKA-10173:
--

Ok, the fix is now backported all the way back to 2.4. I'll leave this ticket 
open until I get the system test PR in.

> BufferUnderflowException during Kafka Streams Upgrade
> -
>
> Key: KAFKA-10173
> URL: https://issues.apache.org/jira/browse/KAFKA-10173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Karsten Schnitter
>Assignee: John Roesler
>Priority: Blocker
>  Labels: suppress
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I 
> followed the steps described in the upgrade guide and set the property 
> {{migrate.from=2.3}}. On my dev system with just one running instance I got 
> the following exception:
> {noformat}
> stream-thread [0-StreamThread-2] Encountered the following error during 
> processing:
> java.nio.BufferUnderflowException: null
>   at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
>   at java.base/java.nio.ByteBuffer.get(Unknown Source)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83)
>   at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368)
>   at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>   at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {noformat}
> I figured out, that this problem only occurs for stores, where I use the 
> suppress feature. If I rename the changelog topics during the migration, the 
> problem will not occur. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10207) Untrimmed Index files cause premature log segment deletions on startup

2020-06-26 Thread Johnny Malizia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146757#comment-17146757
 ] 

Johnny Malizia commented on KAFKA-10207:


I didn't see where to assign this to me, but I went ahead and submitted a 
change to handle this scenario more gracefully. 
[https://github.com/apache/kafka/pull/8936]

With that said, it does seem to go directly against part of the benefits of 
KIP-263, so I am open to alternatives.

> Untrimmed Index files cause premature log segment deletions on startup
> --
>
> Key: KAFKA-10207
> URL: https://issues.apache.org/jira/browse/KAFKA-10207
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.4.0, 2.3.1, 2.4.1
>Reporter: Johnny Malizia
>Priority: Major
>
> [KIP-263|https://cwiki.apache.org/confluence/display/KAFKA/KIP-263%3A+Allow+broker+to+skip+sanity+check+of+inactive+segments+on+broker+startup#KIP263:Allowbrokertoskipsanitycheckofinactivesegmentsonbrokerstartup-Evaluation]
>  appears to have introduced a change explicitly deciding to not call the 
> sanityCheck method on the time or offset index files that are loaded by Kafka 
> at startup. I found a particularly nasty bug using the following configuration
> {code:java}
> jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws kafka=2.4.1{code}
> The bug was that the retention period for a topic or even the broker level 
> configuration seemed to not be respected, no matter what, when the broker 
> started up it would decide that all log segments on disk were breaching the 
> retention window and the data would be purged away.
>  
> {code:java}
> Found deletable segments with base offsets [11610665,12130396,12650133] due 
> to retention time 8640ms breach {code}
> {code:java}
> Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log)
> Scheduling segments for deletion List(LogSegment(baseOffset=11610665, 
> size=1073731621, lastModifiedTime=1592532125000, largestTime=0), 
> LogSegment(baseOffset=12130396, size=1073727967, 
> lastModifiedTime=1592532462000, largestTime=0), 
> LogSegment(baseOffset=12650133, size=235891971, 
> lastModifiedTime=1592532531000, largestTime=0)) {code}
> Further logging showed that this issue was happening when loading the files, 
> indicating the final writes to trim the index were not successful
> {code:java}
> DEBUG Loaded index file 
> /mnt/kafka-logs/test_topic-0/17221277.timeindex with maxEntries = 
> 873813, maxIndexSize = 10485760, entries = 873813, lastOffset = 
> TimestampOffset(0,17221277), file position = 10485756 
> (kafka.log.TimeIndex){code}
>  It looks like the initially file is preallocated (10MB by default) and index 
> entries are added over time. When it's time to roll to a new log segment, the 
> index file is supposed to be trimmed, removing any 0 bytes left at the tail 
> from the initial allocation. But, in some cases that doesn't seem to happen 
> successfully. Because 0 bytes at the tail may not have been removed, when the 
> index is loaded again after restarting Kafka, the buffer seeks the position 
> to the end and the next timestamp is 0 and this leads to a premature TTL 
> deletion of the log segments.
>  
> I tracked the issue down to being caused by the jvm version being used as 
> upgrading resolved this issue, but I think that Kafka should never delete 
> data by mistake like this as doing a rolling restart with this bug in place 
> would cause complete data-loss across the cluster.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Johnny-Malizia opened a new pull request #8936: Fixed padded timeindex causing premature data deletion

2020-06-26 Thread GitBox


Johnny-Malizia opened a new pull request #8936:
URL: https://github.com/apache/kafka/pull/8936


   In some cases when a new log segment is rolled, the previous segment's
   timeindex and offset index do not have the excess 0 bytes trimmed. This
   results in a situation where the next load from disk triggers retention
   window breaches and the data is deleted. This happens because the mmap
   pointing to the index data seeks to the end and because there are 0
   bytes padding the end in some cases, the loaded timestamp is 0.
   
   The sanity checks were previously removed explicitly by KIP-263.
   
   A unittest was added to confirm the sanity check on timeindex will in fact 
catch this scenario. There doesn't seem to be existing testing for 
LogSegment.sanityCheck() so I opted to test the specific functionality and rely 
on the same logic that TransactionIndex currently relies on. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10207) Untrimmed Index files cause premature log segment deletions on startup

2020-06-26 Thread Johnny Malizia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Malizia updated KAFKA-10207:
---
Description: 
[KIP-263|https://cwiki.apache.org/confluence/display/KAFKA/KIP-263%3A+Allow+broker+to+skip+sanity+check+of+inactive+segments+on+broker+startup#KIP263:Allowbrokertoskipsanitycheckofinactivesegmentsonbrokerstartup-Evaluation]
 appears to have introduced a change explicitly deciding to not call the 
sanityCheck method on the time or offset index files that are loaded by Kafka 
at startup. I found a particularly nasty bug using the following configuration
{code:java}
jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws kafka=2.4.1{code}
The bug was that the retention period for a topic or even the broker level 
configuration seemed to not be respected, no matter what, when the broker 
started up it would decide that all log segments on disk were breaching the 
retention window and the data would be purged away.

 
{code:java}
Found deletable segments with base offsets [11610665,12130396,12650133] due to 
retention time 8640ms breach {code}
{code:java}
Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log)
Scheduling segments for deletion List(LogSegment(baseOffset=11610665, 
size=1073731621, lastModifiedTime=1592532125000, largestTime=0), 
LogSegment(baseOffset=12130396, size=1073727967, 
lastModifiedTime=1592532462000, largestTime=0), LogSegment(baseOffset=12650133, 
size=235891971, lastModifiedTime=1592532531000, largestTime=0)) {code}
Further logging showed that this issue was happening when loading the files, 
indicating the final writes to trim the index were not successful
{code:java}
DEBUG Loaded index file 
/mnt/kafka-logs/test_topic-0/17221277.timeindex with maxEntries = 
873813, maxIndexSize = 10485760, entries = 873813, lastOffset = 
TimestampOffset(0,17221277), file position = 10485756 
(kafka.log.TimeIndex){code}
 It looks like the initially file is preallocated (10MB by default) and index 
entries are added over time. When it's time to roll to a new log segment, the 
index file is supposed to be trimmed, removing any 0 bytes left at the tail 
from the initial allocation. But, in some cases that doesn't seem to happen 
successfully. Because 0 bytes at the tail may not have been removed, when the 
index is loaded again after restarting Kafka, the buffer seeks the position to 
the end and the next timestamp is 0 and this leads to a premature TTL deletion 
of the log segments.

 

I tracked the issue down to being caused by the jvm version being used as 
upgrading resolved this issue, but I think that Kafka should never delete data 
by mistake like this as doing a rolling restart with this bug in place would 
cause complete data-loss across the cluster.

 

  was:
[KIP-263|https://cwiki.apache.org/confluence/display/KAFKA/KIP-263%3A+Allow+broker+to+skip+sanity+check+of+inactive+segments+on+broker+startup#KIP263:Allowbrokertoskipsanitycheckofinactivesegmentsonbrokerstartup-Evaluation]
 appears to have introduced a change explicitly deciding to not call the 
sanityCheck method on the time or offset index files that are loaded by Kafka 
at startup. I found a particularly nasty bug using the following configuration
{code:java}
jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws kafka=2.4.1{code}
The bug was that the retention period for a topic or even the broker level 
configuration seemed to not be respected, no matter what, when the broker 
started up it would decide that all log segments on disk were breaching the 
retention window and the data would be purged away.

 
{code:java}
Found deletable segments with base offsets [11610665,12130396,12650133] due to 
retention time 8640ms breach {code}
{code:java}
Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log)
Scheduling segments for deletion List(LogSegment(baseOffset=11610665, 
size=1073731621, lastModifiedTime=1592532125000, largestTime=0), 
LogSegment(baseOffset=12130396, size=1073727967, 
lastModifiedTime=1592532462000, largestTime=0), LogSegment(baseOffset=12650133, 
size=235891971, lastModifiedTime=1592532531000, largestTime=0)) {code}
Further logging showed that this issue was happening when loading the files, 
indicating the final writes to trim the index were not successful
{code:java}
DEBUG Loaded index file 
/mnt/kafka-logs/test_topic-0/17221277.timeindex with maxEntries = 
873813, maxIndexSize = 10485760, entries = 873813, lastOffset = 
TimestampOffset(0,17221277), file position = 10485756 
(kafka.log.TimeIndex){code}
 

So, because the index leaves the preallocated 0 bytes at the tail, when the 
index is loaded again after restarting Kafka, the next timestamp is 0 and this 
leads to a premature TTL deletion of the log segments.

 

I tracked the issue down to being caused by the jvm version being used as 
upgrading resolved this issue, but I think that Kafka should never 

[jira] [Commented] (KAFKA-10173) BufferUnderflowException during Kafka Streams Upgrade

2020-06-26 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146749#comment-17146749
 ] 

John Roesler commented on KAFKA-10173:
--

Ok, I've just merged the fix ([https://github.com/apache/kafka/pull/8905] ), 
and I'm backporting it now.

To control the scope of the change, I've separated the upgrade system tests out 
for a second PR, which I'll submit next week. I did run them locally, and they 
pass after #8905, so I have confidence the bug is really fixed.

> BufferUnderflowException during Kafka Streams Upgrade
> -
>
> Key: KAFKA-10173
> URL: https://issues.apache.org/jira/browse/KAFKA-10173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Karsten Schnitter
>Assignee: John Roesler
>Priority: Blocker
>  Labels: suppress
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I 
> followed the steps described in the upgrade guide and set the property 
> {{migrate.from=2.3}}. On my dev system with just one running instance I got 
> the following exception:
> {noformat}
> stream-thread [0-StreamThread-2] Encountered the following error during 
> processing:
> java.nio.BufferUnderflowException: null
>   at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
>   at java.base/java.nio.ByteBuffer.get(Unknown Source)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83)
>   at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368)
>   at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>   at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {noformat}
> I figured out, that this problem only occurs for stores, where I use the 
> suppress feature. If I rename the changelog topics during the migration, the 
> problem will not occur. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2020-06-26 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146747#comment-17146747
 ] 

John Roesler commented on KAFKA-8630:
-

Hey, all, I inadvertently started a new ticket for this KAFKA-10200, and 
submitted a patch already. Sorry if I stepped on any toes.

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Assignee: John Roesler
>Priority: Critical
> Fix For: 2.7.0, 2.6.1
>
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at 
> com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 

[jira] [Assigned] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2020-06-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-8630:
---

Assignee: John Roesler

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Assignee: John Roesler
>Priority: Critical
> Fix For: 2.7.0, 2.6.1
>
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at 
> com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> 

[GitHub] [kafka] vvcephei merged pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei merged pull request #8905:
URL: https://github.com/apache/kafka/pull/8905


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650476597


   Failures were unrelated:
   
   ```
   kafka.api.PlaintextConsumerTest > testLowMaxFetchSizeForRequestAndPartition 
FAILED
   org.scalatest.exceptions.TestFailedException: Timed out before consuming 
expected 2700 records. The number consumed was 1077.
   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
   at org.scalatest.Assertions.fail(Assertions.scala:1091)
   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
   at 
kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:158)
   at 
kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition(PlaintextConsumerTest.scala:804)
   ```
   
   ```
   org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
testReplication FAILED
   java.lang.RuntimeException: Could not find enough records. found 0, 
expected 100
   at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435)
   at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:217)
   ```
   
   ```
   kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
   ```
   
   ```
   
org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
   ```
   
   ```
   
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10185) Streams should log summarized restoration information at info level

2020-06-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10185.
--
Resolution: Fixed

> Streams should log summarized restoration information at info level
> ---
>
> Key: KAFKA-10185
> URL: https://issues.apache.org/jira/browse/KAFKA-10185
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> Currently, restoration progress is only visible at debug level in the 
> Consumer's Fetcher logs. Users can register a restoration listener and 
> implement their own logging, but it would substantially improve operability 
> to have some logs available at INFO level.
> Logging each partition in each restore batch at info level would be too much, 
> though, so we should print summarized logs at a decreased interval, like 
> every 10 seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10185) Streams should log summarized restoration information at info level

2020-06-26 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146739#comment-17146739
 ] 

John Roesler commented on KAFKA-10185:
--

Hey [~rhauch] ,

Sorry, I should have emailed the list when I added this. The fix for this is 
already merged in the 2.6 branch, so I think a fix version of 2.6.0 is correct. 
I just neglected to actually close the ticket. I'm sorry, I know this didn't 
make your job any easier.

Thanks,

-John

> Streams should log summarized restoration information at info level
> ---
>
> Key: KAFKA-10185
> URL: https://issues.apache.org/jira/browse/KAFKA-10185
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> Currently, restoration progress is only visible at debug level in the 
> Consumer's Fetcher logs. Users can register a restoration listener and 
> implement their own logging, but it would substantially improve operability 
> to have some logs available at INFO level.
> Logging each partition in each restore batch at info level would be too much, 
> though, so we should print summarized logs at a decreased interval, like 
> every 10 seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10185) Streams should log summarized restoration information at info level

2020-06-26 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10185:
-
Fix Version/s: (was: 2.7.0)
   2.5.1
   2.6.0

> Streams should log summarized restoration information at info level
> ---
>
> Key: KAFKA-10185
> URL: https://issues.apache.org/jira/browse/KAFKA-10185
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> Currently, restoration progress is only visible at debug level in the 
> Consumer's Fetcher logs. Users can register a restoration listener and 
> implement their own logging, but it would substantially improve operability 
> to have some logs available at INFO level.
> Logging each partition in each restore batch at info level would be too much, 
> though, so we should print summarized logs at a decreased interval, like 
> every 10 seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8927: KAFKA-10200: Fix testability of PAPI with windowed stores

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8927:
URL: https://github.com/apache/kafka/pull/8927#discussion_r446473287



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+
+public final class ProcessorContextUtils {

Review comment:
   Uh, no, that was an oversight :) I might as well fix it in this PR, so 
I'll not merge just now. Thanks for pointing it out.
   
   I think where it leaves us, in a nutshell, is that state stores should never 
cast the context, not as long as their `init()` method is a public interface. 
This class is a way to collect all the casting-related sins and put them all in 
one place where we can keep an eye on them.
   
   Casting doesn't always indicate a design failure, but in this case, it 100% 
does. I'd hope that everything in this class gets designed away and we can 
delete the class. I'd flip the table and do it right now, but I don't want to 
block the unit-testability of the PAPI behind a bunch of KIPs. I'll make 
tickets to fix the stuff that needs to be fixed.
   
   In contrast, note that all the Processor implementations that power our DSL 
are very much _not_ for public use, so for them, casting to 
InternalProcessorContext is fair game. Although, I _have_ found and/or fixed a 
fair number of bugs following from casting in the processors as well. So, 
although, it's not a contract violation, it still may not be a good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8914: MINOR: Do not swallow exception when collecting PIDs

2020-06-26 Thread GitBox


vvcephei commented on pull request #8914:
URL: https://github.com/apache/kafka/pull/8914#issuecomment-650474359


   Thanks for running the tests.
   
   One test failed. I haven't looked to see if it's important: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-26--001.1593186338--cadonna--donot_swallow_exeption_when_retrieving_pids--911b7a2f0/report.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r446472130



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+
+public abstract class AbstractStateManager implements StateManager {

Review comment:
   Thanks! This seems like the kind of debate that will continue for a 
while :) 
   
   Just to share the thought, adding abstraction isn't the only way to 
de-duplicate code. In fact composition is generally less costly to maintain, 
for example adding utility methods.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r446471839



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -103,8 +104,13 @@ public void init(final ProcessorContext context,
 
 @SuppressWarnings("unchecked")
 void initStoreSerde(final ProcessorContext context) {
+final InternalProcessorContext internalProcessorContext = 
(InternalProcessorContext) context;
+final String storeName = name();
+final String changelogTopic = 
internalProcessorContext.changelogFor(storeName);
 serdes = new StateSerdes<>(
-ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+changelogTopic != null ?
+changelogTopic :
+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),

Review comment:
   No problem! I'm glad it proved useful.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r446471736



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
##
@@ -124,18 +141,19 @@ public void before() {
 Serdes.String()
 );
 metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
-expect(context.metrics())
-.andReturn(new StreamsMetricsImpl(metrics, "test", 
builtInMetricsVersion)).anyTimes();
-expect(context.taskId()).andReturn(taskId).anyTimes();
-expect(inner.name()).andReturn("metered").anyTimes();
+expect(context.applicationId()).andStubReturn(APPLICATION_ID);
+expect(context.metrics()).andStubReturn(new 
StreamsMetricsImpl(metrics, "test", builtInMetricsVersion));
+expect(context.taskId()).andStubReturn(taskId);
+
expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);

Review comment:
   Thanks, all. What Sophie mentioned is more in line with what I had in 
mind. The advantage of something like "MIPC" is that it just generally 
satisfies the API contract while not really doing stuff like writing to the 
brokers, etc. Because it's a contract-valid implementation of 
InternalProcessorContext, we can use it transparently to support any use case 
where we really don't care exactly what calls the unit under test makes into 
the context.
   
   EasyMock is very powerful, but it can't magically implement a complex 
interface contract, so it can never fill this gap. On the other hand, if 
there's really only going to be one or two calls, then it's fine. And, of 
course, if we _need_ to verify interactions, exceptions, etc., then EasyMock 
gives us a very powerful set of utilities for it.
   
   Anyway, we don't have to change this right now, but I've certainly spent 
many hours fiddling around with easymock expectations just like this, and it 
provides no value if you just want the thing to behave the way that MIPC 
already behaves. Which is why I pointed it out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-06-26 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r446126644



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -125,35 +131,37 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
 
 final CreateTopicsResult createTopicsResult = 
adminClient.createTopics(newTopics);
 
-for (final Map.Entry> 
createTopicResult : createTopicsResult.values().entrySet()) {
-final String topicName = createTopicResult.getKey();
-try {
-createTopicResult.getValue().get();
-topicsNotReady.remove(topicName);
-} catch (final InterruptedException fatalException) {
-// this should not happen; if it ever happens it 
indicate a bug
-Thread.currentThread().interrupt();
-log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
-throw new 
IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
-} catch (final ExecutionException executionException) {
-final Throwable cause = executionException.getCause();
-if (cause instanceof TopicExistsException) {
-// This topic didn't exist earlier or its leader 
not known before; just retain it for next round of validation.
-log.info("Could not create topic {}. Topic is 
probably marked for deletion (number of partitions is unknown).\n" +
-"Will retry to create this topic in {} ms (to 
let broker finish async delete operation first).\n" +
-"Error message was: {}", topicName, 
retryBackOffMs, cause.toString());
-} else {
-log.error("Unexpected error during topic creation 
for {}.\n" +
-"Error message was: {}", topicName, 
cause.toString());
-throw new StreamsException(String.format("Could 
not create topic %s.", topicName), cause);
+if (createTopicsResult != null) {
+for (final Map.Entry> 
createTopicResult : createTopicsResult.values().entrySet()) {

Review comment:
   Need the null check for `createTopicsResult` since the `newTopics` might 
be empty





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-26 Thread Neo Wu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146724#comment-17146724
 ] 

Neo Wu commented on KAFKA-10134:


something like this fix my issues, but i am not sure whether this is right 
thing to do to fit bigger picture
{code:java}
// poll for new data until the timeout expires
Map>> records = null;
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
// try to update assignment metadata BUT do not need to block on the 
timer if we still have
// some assigned partitions, since even if we are 1) in the middle of a 
rebalance
// or 2) have partitions with unknown starting positions we may still 
want to return some data
// as long as there are some partitions fetchable; NOTE we always use a 
timer with 0ms
// to never block on completing the rebalance procedure if there's any
if (subscriptions.fetchablePartitions(tp -> true).isEmpty() || records 
== null || records.isEmpty()) {
updateAssignmentMetadataIfNeeded(timer);
} else {
final Timer updateMetadataTimer = time.timer(0L);
updateAssignmentMetadataIfNeeded(updateMetadataTimer);
timer.update(updateMetadataTimer.currentTimeMs());
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}

records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round 
of fetches
// and avoid block waiting for their responses to enable pipelining 
while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must 
not allow
// wakeups or any other errors to be triggered prior to returning the 
fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}

return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
{code}

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 

[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-26 Thread Neo Wu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146723#comment-17146723
 ] 

Neo Wu commented on KAFKA-10134:


maybe besides fetchablePartitions, it also should check records = 
pollForFetches(timer)?

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-26 Thread Neo Wu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146721#comment-17146721
 ] 

Neo Wu commented on KAFKA-10134:


Hi, [~guozhang]

Thanks for quick update! 

I reviewed and tested your patch, and still found some issue
{code:java}
if (subscriptions.fetchablePartitions(tp -> true).isEmpty()) {
updateAssignmentMetadataIfNeeded(timer);
} else {
final Timer updateMetadataTimer = time.timer(0L);
updateAssignmentMetadataIfNeeded(updateMetadataTimer);
timer.update(updateMetadataTimer.currentTimeMs());
}
{code}
there are 2 scenarios
1) start java consumer with kafka stopped, the code work as expected, it flows 
to first branch, and wait with timer, 
and will be able to connect to kafka when kafka is up (with low cpu usage due 
to timer)

however in 2nd secnarios

 

2) start both java consumer and kafka, and let java consumer successfully 
subscribe some topics, then force stop kafka
then subscriptions.fetchablePartitions(tp -> true) will return non empty 
result, then it will go into second branch without blocking
and it will trigger busy cpu wait

Thanks,

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10207) Untrimmed Index files cause premature log segment deletions on startup

2020-06-26 Thread Johnny Malizia (Jira)
Johnny Malizia created KAFKA-10207:
--

 Summary: Untrimmed Index files cause premature log segment 
deletions on startup
 Key: KAFKA-10207
 URL: https://issues.apache.org/jira/browse/KAFKA-10207
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 2.4.1, 2.3.1, 2.4.0
Reporter: Johnny Malizia


[KIP-263|https://cwiki.apache.org/confluence/display/KAFKA/KIP-263%3A+Allow+broker+to+skip+sanity+check+of+inactive+segments+on+broker+startup#KIP263:Allowbrokertoskipsanitycheckofinactivesegmentsonbrokerstartup-Evaluation]
 appears to have introduced a change explicitly deciding to not call the 
sanityCheck method on the time or offset index files that are loaded by Kafka 
at startup. I found a particularly nasty bug using the following configuration
{code:java}
jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws kafka=2.4.1{code}
The bug was that the retention period for a topic or even the broker level 
configuration seemed to not be respected, no matter what, when the broker 
started up it would decide that all log segments on disk were breaching the 
retention window and the data would be purged away.

 
{code:java}
Found deletable segments with base offsets [11610665,12130396,12650133] due to 
retention time 8640ms breach {code}
{code:java}
Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log)
Scheduling segments for deletion List(LogSegment(baseOffset=11610665, 
size=1073731621, lastModifiedTime=1592532125000, largestTime=0), 
LogSegment(baseOffset=12130396, size=1073727967, 
lastModifiedTime=1592532462000, largestTime=0), LogSegment(baseOffset=12650133, 
size=235891971, lastModifiedTime=1592532531000, largestTime=0)) {code}
Further logging showed that this issue was happening when loading the files, 
indicating the final writes to trim the index were not successful
{code:java}
DEBUG Loaded index file 
/mnt/kafka-logs/test_topic-0/17221277.timeindex with maxEntries = 
873813, maxIndexSize = 10485760, entries = 873813, lastOffset = 
TimestampOffset(0,17221277), file position = 10485756 
(kafka.log.TimeIndex){code}
 

So, because the index leaves the preallocated 0 bytes at the tail, when the 
index is loaded again after restarting Kafka, the next timestamp is 0 and this 
leads to a premature TTL deletion of the log segments.

 

I tracked the issue down to being caused by the jvm version being used as 
upgrading resolved this issue, but I think that Kafka should never delete data 
by mistake like this as doing a rolling restart with this bug in place would 
cause complete data-loss across the cluster.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10189) Reset metric EventQueueTimeMs

2020-06-26 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10189:

Labels:   (was: core)

> Reset metric EventQueueTimeMs 
> --
>
> Key: KAFKA-10189
> URL: https://issues.apache.org/jira/browse/KAFKA-10189
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core, metrics
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Minor
>
> The metric 
> [EventQueueTimeMs|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L81]
>  does not reset and therefore misrepresents the controller event queue time 
> in these two scenarios:
>  1. upon losing leader election - `EventQueueTimeMs` portrays the last event 
> queue time of the previous controller and not the current controller
>  2. no controller events are added to the queue - `EventQueueTimeMs` portrays 
> the most recent event queue time, not the current queue time (which is 0)
> For both cases, we should reset the controller event queue time to 0.
> Implementation:
> Instead of using `LinkedBlockingQueue.take()` 
> [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L118],
>  we can use `LinkedBlockingQueue.poll(long timeout, TimeUnit unit)` and reset 
> `EventQueueTimeMs` if the queue is empty.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10189) Reset metric EventQueueTimeMs

2020-06-26 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10189:

Labels: core  (was: pull-request-available)

> Reset metric EventQueueTimeMs 
> --
>
> Key: KAFKA-10189
> URL: https://issues.apache.org/jira/browse/KAFKA-10189
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, metrics
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Minor
>  Labels: core
>
> The metric 
> [EventQueueTimeMs|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L81]
>  does not reset and therefore misrepresents the controller event queue time 
> in these two scenarios:
>  1. upon losing leader election - `EventQueueTimeMs` portrays the last event 
> queue time of the previous controller and not the current controller
>  2. no controller events are added to the queue - `EventQueueTimeMs` portrays 
> the most recent event queue time, not the current queue time (which is 0)
> For both cases, we should reset the controller event queue time to 0.
> Implementation:
> Instead of using `LinkedBlockingQueue.take()` 
> [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L118],
>  we can use `LinkedBlockingQueue.poll(long timeout, TimeUnit unit)` and reset 
> `EventQueueTimeMs` if the queue is empty.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10189) Reset metric EventQueueTimeMs

2020-06-26 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10189:

Component/s: core

> Reset metric EventQueueTimeMs 
> --
>
> Key: KAFKA-10189
> URL: https://issues.apache.org/jira/browse/KAFKA-10189
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core, metrics
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Minor
>  Labels: core
>
> The metric 
> [EventQueueTimeMs|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L81]
>  does not reset and therefore misrepresents the controller event queue time 
> in these two scenarios:
>  1. upon losing leader election - `EventQueueTimeMs` portrays the last event 
> queue time of the previous controller and not the current controller
>  2. no controller events are added to the queue - `EventQueueTimeMs` portrays 
> the most recent event queue time, not the current queue time (which is 0)
> For both cases, we should reset the controller event queue time to 0.
> Implementation:
> Instead of using `LinkedBlockingQueue.take()` 
> [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L118],
>  we can use `LinkedBlockingQueue.poll(long timeout, TimeUnit unit)` and reset 
> `EventQueueTimeMs` if the queue is empty.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-26 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146707#comment-17146707
 ] 

Almog Gavra commented on KAFKA-10179:
-

[~ableegoldman] I confirmed locally that nothing "breaks" if we use a 
deserializer that projects a subset of the fields in the record, as you 
suspected, but consider the following points:
 # Some of the most popular serdes are asymmetric (e.g. avro builds in the 
concept of reader/writer schema into their APIs)
 # It may be impossible to determine, for a given serde, whether it is symmetric
 # State after recovery should be identical to before recovery for predictable 
operations (especially in cloud environments)
 # Some of the most popular serdes have side effects (e.g. Confluent schema 
registry serdes will create subjects on your behalf)

In practice, the first three points in conjunction with what [~mjsax] said (the 
source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic), means that we can't safely turn on the 
source-topic-changelog optimization unless the user indicates either (a) they 
are using a symmetrical serde or (b) they are willing to waive 3 in order to 
speed up recovery ([~cadonna] if we consider 3 a matter of correctness, we 
can't sacrifice correctness for performance without the user's consent).

Even if the user indicates (a) or (b) above, I still don't think we can 
implement the fix described here because of the fourth point. It may be 
possible that the user is using a symmetric serde but their schema is not 
identical to the one that wrote to the kafka topic (e.g. ksql, for example, 
generates a new schema where all the fields are the same but the schema has a 
different name, I can also easily imagine a schema with _more_ fields that 
would write the same value as it read from an event with fewer fields).

I'm not sure I understand this comment: "The data would be deserialized and 
re-serialized using the same Serde (this is inefficiency we pay, as we also 
need to send the de-serialized data downstream for further processing)." Why 
can't we just always pass-through the data into the state store if the 
optimization is enabled?

 

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-26 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146706#comment-17146706
 ] 

Guozhang Wang commented on KAFKA-10134:
---

I've provided a PR for trunk, but it should apply cleanly to 2.5 as well (I 
will cherry-pick this when merging). [~neowu0] [~seanguo] please let me know if 
you could apply the patch and verify if it helps resolving the issue.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeffkbkim opened a new pull request #8935: reset event queue time histogram when queue is empty

2020-06-26 Thread GitBox


jeffkbkim opened a new pull request #8935:
URL: https://github.com/apache/kafka/pull/8935


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   * https://issues.apache.org/jira/browse/KAFKA-10189
   * add a timeout for event queue time histogram
   * reset `eventQueueTimeHist` when the controller event queue is empty
   * unit test for resetting event queue time histogram 
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions

2020-06-26 Thread GitBox


guozhangwang commented on pull request #8934:
URL: https://github.com/apache/kafka/pull/8934#issuecomment-650464132


   @hachikuji @ijuma 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions

2020-06-26 Thread GitBox


guozhangwang opened a new pull request #8934:
URL: https://github.com/apache/kafka/pull/8934


   The intention of using poll(0) is to not block on rebalance but still return 
some data; however, if there's no fetchable partitions then there's no point of 
polling with 0ms. What's worse, with poll(0) we may fall into a busy loop since 
we may advance the system test with too small pace.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


guozhangwang commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446461812



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##
@@ -258,34 +263,43 @@ private void logValue(final Bytes key, final BufferKey 
bufferKey, final BufferVa
 final int sizeOfBufferTime = Long.BYTES;
 final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
 buffer.putLong(bufferKey.time());
-
+final byte[] array = buffer.array();
 ((RecordCollector.Supplier) context).recordCollector().send(
-changelogTopic,
-key,
-buffer.array(),
-V_2_CHANGELOG_HEADERS,
-partition,
-null,
-KEY_SERIALIZER,
-VALUE_SERIALIZER
+changelogTopic,
+key,
+array,
+CHANGELOG_HEADERS,
+partition,
+null,
+KEY_SERIALIZER,
+VALUE_SERIALIZER
 );
 }
 
 private void logTombstone(final Bytes key) {
 ((RecordCollector.Supplier) context).recordCollector().send(
-changelogTopic,
-key,
-null,
-null,
-partition,
-null,
-KEY_SERIALIZER,
-VALUE_SERIALIZER
+changelogTopic,
+key,
+null,
+null,

Review comment:
   SG





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650462969


   Ok, @guozhangwang , This is my "final" iteration. I pulled the system tests 
out, and I'll follow up with another PR later. This PR should be sufficient for 
the basic purpose, thanks to the new "binary" compatibility unit tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-6453) Reconsider timestamp propagation semantics

2020-06-26 Thread Victoria Bialas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Bialas resolved KAFKA-6453.

Resolution: Fixed

Fixed by James Galasyn in https://github.com/apache/kafka/pull/8920

> Reconsider timestamp propagation semantics
> --
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Victoria Bialas
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>  - define a DSL "contract" for timestamp propagation for each DSL operator
>  - document the DSL "contract"
>  - implement the DSL "contract" using the new/extended Processor API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman closed pull request #6633: [MINOR] Better estimate size of cache

2020-06-26 Thread GitBox


ableegoldman closed pull request #6633:
URL: https://github.com/apache/kafka/pull/6633


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman closed pull request #7413: KAFKA-8649: version probing with upgraded leader

2020-06-26 Thread GitBox


ableegoldman closed pull request #7413:
URL: https://github.com/apache/kafka/pull/7413


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10177) Replace/improve Percentiles metrics

2020-06-26 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146681#comment-17146681
 ] 

Sophie Blee-Goldman commented on KAFKA-10177:
-

I haven't personally looked into HrdHistogram specifically, but I think the 
approach of porting over an existing and well-tested implementation is the 
right way to go.

It's probably not worth an extra dependency and shouldn't be too complicated to 
re-implement a reasonable percentiles algorithm (famous last words, I know...)

> Replace/improve Percentiles metrics
> ---
>
> Key: KAFKA-10177
> URL: https://issues.apache.org/jira/browse/KAFKA-10177
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> There's an existing – but seemingly unused – implementation of percentile 
> metrics that we attempted to use for end-to-end latency metrics in Streams. 
> Unfortunately a number of limitations became apparent, and we ultimately 
> pulled the metrics from the 2.6 release pending further 
> investigation/improvement.
> The problems we encountered were
>  # Need to set a static upper/lower limit for the values
>  # Not well suited to a distribution with a long tail, ie setting the max 
> value too high caused the accuracy to plummet
>  # Required a lot of memory per metric for reasonable accuracy and caused us 
> to hit OOM (unclear if there was actually a memory leak, or it was just 
> gobbling up unnecessarily large amounts in general)
> Since the Percentiles class is part of the public API, we may need to create 
> a new class altogether and possibly deprecate/remove the old one. 
> Alternatively we can consider just re-implementing the existing class from 
> scratch, and just deprecating the current constructors and associated 
> implementation (eg the constructor accepts a max)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-26 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146678#comment-17146678
 ] 

Guozhang Wang commented on KAFKA-10134:
---

[~seanguo] I think [~neowu0]'s brought up issue is a valid one to tackle, but 
I'm not sure if it is the same root cause you've seen: basically the consumer 
maybe tied in the metadata refresh loop if it cannot find the coordinator, but 
that is only the case if the coordinator broker is indeed not available during 
that time.

In your observation though, there should be no broker unavailability since 
you're just bouncing the consumer instance and the coordinator should be quick 
to discover.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10166) Excessive TaskCorruptedException seen in testing

2020-06-26 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-10166.
-
Resolution: Fixed

> Excessive TaskCorruptedException seen in testing
> 
>
> Key: KAFKA-10166
> URL: https://issues.apache.org/jira/browse/KAFKA-10166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As the title indicates, long-running test applications with injected network 
> "outages" seem to hit TaskCorruptedException more than expected.
> Seen occasionally on the ALOS application (~20 times in two days in one case, 
> for example), and very frequently with EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-26 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-10134:
-

Assignee: Guozhang Wang

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task

2020-06-26 Thread GitBox


guozhangwang commented on pull request #8926:
URL: https://github.com/apache/kafka/pull/8926#issuecomment-650426719


   Cherry-picked to 2.6 since it is a blocker, cc @rhauch 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task

2020-06-26 Thread GitBox


guozhangwang merged pull request #8926:
URL: https://github.com/apache/kafka/pull/8926


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

2020-06-26 Thread GitBox


ableegoldman commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r446431689



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##
@@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() {
 if (thisHost.equals(UNKNOWN_HOST)) {
 return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), 
Collections.emptySet(), -1);
 }
-return new KeyQueryMetadata(localMetadata.hostInfo(), 
Collections.emptySet(), -1);
+return new KeyQueryMetadata(localMetadata.get().hostInfo(), 
Collections.emptySet(), -1);

Review comment:
   I looked through the StreamsMetadataState and it does seem like it could 
technically be null if this instance was never assigned any active or standby 
tasks at all, ever. That really _shouldn't_ happen, but of course it can if you 
massively over-provisioned your app and we shouldn't throw an NPE over that.
   
   Seems like this is actually an existing bug that we should fix. Then we can 
improve the initialization check on the side





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishbellapu commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration

2020-06-26 Thread GitBox


satishbellapu commented on pull request #8921:
URL: https://github.com/apache/kafka/pull/8921#issuecomment-650419736


   + @cmccabe @rajinisivaram for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8925: KAFKA-9974: Make produce-sync flush

2020-06-26 Thread GitBox


ableegoldman commented on pull request #8925:
URL: https://github.com/apache/kafka/pull/8925#issuecomment-650414999


   Is Jenkins linking to the wrong builds? When I try to see which tests failed 
here it seems to bring me to the results for two completely different PRs. 
   
   Really dropping the ball lately Mr. Jenkins



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650413094


   I will follow up shortly to extract the system tests to a separate PR, since 
we're having trouble running the tests at all right now, and we wouldn't know 
if they are even more broken.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650412771


   Hey @guozhangwang , you might want to take a look at that last fix. The 
duck-typing code was producing an OOME some times, when it would just interpret 
a random integer out of the buffer as a "size" (integer) and blindly allocate 
an array of that size.
   
   I added a Util (with tests) that has a guard to prevent this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8927: KAFKA-10200: Fix testability of PAPI with windowed stores

2020-06-26 Thread GitBox


ableegoldman commented on a change in pull request #8927:
URL: https://github.com/apache/kafka/pull/8927#discussion_r446418844



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+
+public final class ProcessorContextUtils {

Review comment:
   I'm trying to phrase this is a less discouraging way, since I agree this 
is a nice & clever fix: 
   
   is this meant to be the actual, final solution to this problem, or just a 
temporary workaround to unblock us without the need for a KIP? This just 
"happens" to work out nicely because we know the mocks are actually returning a 
StreamsMetricsImpl as well. What happens if we need to add/access more 
complicated processor context functionality in the inner state stores? I say 
"inner" because the caching layers for example also perform a cast to 
InternalProcessorContext.
   
   I completely agree with proceeding with this, so don't take that question as 
a challenge to this PR. Just wondering where this leaves us going forward. 
Should we accept (and therefore enforce) that state stores can't have caching 
enabled in unit tests?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446409619



##
File path: build.gradle
##
@@ -97,7 +97,7 @@ ext {
   buildVersionFileName = "kafka-version.properties"
 
   defaultMaxHeapSize = "2g"
-  defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC"]
+  defaultJvmArgs = ["-Xss4m"]

Review comment:
   Aha! I figured it out. There actually was a bug in the test. While 
duck-typing, the code was trying to allocate an array of 1.8GB. It's funny that 
disabling this flag made this test pass on java 11 and 14. Maybe the flag 
partitions the heap on those versions or something, so the test didn't actually 
have the full 2GB available. Anyway, I'm about to push a fix and put the flag 
back.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

2020-06-26 Thread GitBox


ableegoldman commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r446408964



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##
@@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() {
 if (thisHost.equals(UNKNOWN_HOST)) {
 return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), 
Collections.emptySet(), -1);
 }
-return new KeyQueryMetadata(localMetadata.hostInfo(), 
Collections.emptySet(), -1);
+return new KeyQueryMetadata(localMetadata.get().hostInfo(), 
Collections.emptySet(), -1);

Review comment:
   This gets initialized during the rebalance and IQ isn't available until 
Streams has reached RUNNING. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

2020-06-26 Thread GitBox


abbccdda commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r446404393



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##
@@ -50,7 +51,7 @@
 private final HostInfo thisHost;
 private List allMetadata = Collections.emptyList();
 private Cluster clusterMetadata;
-private StreamsMetadata localMetadata;
+private AtomicReference localMetadata = new 
AtomicReference<>(null);

Review comment:
   This could be final

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##
@@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() {
 if (thisHost.equals(UNKNOWN_HOST)) {
 return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), 
Collections.emptySet(), -1);
 }
-return new KeyQueryMetadata(localMetadata.hostInfo(), 
Collections.emptySet(), -1);
+return new KeyQueryMetadata(localMetadata.get().hostInfo(), 
Collections.emptySet(), -1);

Review comment:
   Hmm, why do we still keep it? Based on the reviews for previous version, 
I believe that there is some strict ordering for getting `localMetadata` 
initialized to be non-null on L352 first before hitting this logic, but still a 
null check sound more resilient to me, unless we want to have a 
NullPointerException to be thrown explicitly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on pull request #8930: Mirror Maker 2: offset-syncs variable

2020-06-26 Thread GitBox


ryannedolan commented on pull request #8930:
URL: https://github.com/apache/kafka/pull/8930#issuecomment-650389551


   @cgetzen the larger change would be worth it IMO. I think a KIP is the right 
way forward. Happy to help, but you should be able to get permission to create 
a KIP yourself. Might just take a few tries, unfortunately. Maybe check who has 
been responding to such requests lately and reach out to them directly.
   
   Are there other properties that we could make more customizable in this way 
too? Might be a good time to address them all together.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650387794


   Ah, that heap space thing was legit. Fix coming...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task

2020-06-26 Thread GitBox


abbccdda commented on pull request #8926:
URL: https://github.com/apache/kafka/pull/8926#issuecomment-650385489


   ```12:23:50 FAILURE: Build failed with an exception.
   12:23:50 
   12:23:50 * What went wrong:
   12:23:50 Execution failed for task ':streams:unitTest'.
   12:23:50 > Process 'Gradle Test Executor 9' finished with non-zero exit 
value 134
   12:23:50   This problem might be caused by incorrect test process 
configuration.
   12:23:50   Please refer to the test execution section in the User Manual at 
https://docs.gradle.org/6.5/userguide/java_testing.html#sec:test_execution
   12:23:50 ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

2020-06-26 Thread GitBox


lct45 commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-650384491


   Failed connect tests on JDC 14
   
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
   
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStart
   
org.apache.kafka.connect.integration.BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStart
   
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorInitialize



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task

2020-06-26 Thread GitBox


ableegoldman commented on pull request #8926:
URL: https://github.com/apache/kafka/pull/8926#issuecomment-650382721


   Java 8 and 14 builds passed, Java 11 build failed with...zero failures?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cgetzen edited a comment on pull request #8930: Mirror Maker 2: offset-syncs variable

2020-06-26 Thread GitBox


cgetzen edited a comment on pull request #8930:
URL: https://github.com/apache/kafka/pull/8930#issuecomment-650381294


   Hi @ryannedolan, wondering if you could spare a little guidance.
   
   I created `offset-syncs.topic.override` variable with a default of `null`. 
What I would have loved to do is to create something like `offset-syncs.topic` 
with a default value of `mm2-offsets-sync..internal`, but was unable due 
to the alias in the string, and think it would require a much larger change to 
do so. Thoughts?
   
   I also emailed d...@kafka.apache.org to get access to create a KIP, but that 
seems to be a dead end. If I get together the content for a KIP, would you be 
willing to submit it?
   
   Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication

2020-06-26 Thread Leah Thomas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146612#comment-17146612
 ] 

Leah Thomas commented on KAFKA-9509:


Failed again:
h3. Stacktrace

java.lang.RuntimeException: Could not find enough records. found 0, expected 
100 at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435)
 at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:221)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:564)

 

[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1292/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/]

 

> Fix flaky test MirrorConnectorsIntegrationTest.testReplication
> --
>
> Key: KAFKA-9509
> URL: https://issues.apache.org/jira/browse/KAFKA-9509
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Sanjana Kaundinya
>Assignee: Luke Chen
>Priority: Major
> Fix For: 2.5.0
>
>
> The test 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
>  is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of 
> when the connectors and tasks are started up. The fix for this would make it 
> such that when the connectors are started up, to wait until the REST endpoint 
> returns a positive number of tasks to be confident that we can start testing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cgetzen commented on pull request #8930: Mirror Maker 2: offset-syncs variable

2020-06-26 Thread GitBox


cgetzen commented on pull request #8930:
URL: https://github.com/apache/kafka/pull/8930#issuecomment-650381294


   Hi @ryannedolan, wondering if you could spare a little guidance.
   
   I created `offset-syncs.topic.override` variable with a default of `null`. 
What I would have loved to do is to create something like `offset-syncs.topic` 
with a default value of `mm2-offsets-sync..internal`, but was unable to 
due to the alias in the string, and think it would require a much larger change 
to do so. Thoughts?
   
   I also emailed d...@kafka.apache.org to get access to create a KIP, but that 
seems to be a dead end. If I get together the content for a KIP, would you be 
willing to submit it?
   
   Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gharris1727 commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

2020-06-26 Thread GitBox


gharris1727 commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r446379953



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -689,6 +692,10 @@ else if (!context.pausedPartitions().isEmpty())
 
 @Override
 public void onPartitionsRevoked(Collection partitions) 
{
+if (taskStopped) {

Review comment:
   I think this gets called by the consumer thread, which is different from 
the thread which calls `close()`. I think that it may be necessary to mark this 
variable as volatile.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10181) AlterConfig/IncrementalAlterConfig should go to controller

2020-06-26 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10181:

Fix Version/s: 2.7.0

> AlterConfig/IncrementalAlterConfig should go to controller
> --
>
> Key: KAFKA-10181
> URL: https://issues.apache.org/jira/browse/KAFKA-10181
> Project: Kafka
>  Issue Type: Sub-task
>  Components: admin
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>
> In the new Admin client, the request should always be routed towards the 
> controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10181) AlterConfig/IncrementalAlterConfig should go to controller

2020-06-26 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10181:

Component/s: admin

> AlterConfig/IncrementalAlterConfig should go to controller
> --
>
> Key: KAFKA-10181
> URL: https://issues.apache.org/jira/browse/KAFKA-10181
> Project: Kafka
>  Issue Type: Sub-task
>  Components: admin
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the new Admin client, the request should always be routed towards the 
> controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll

2020-06-26 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146596#comment-17146596
 ] 

João Oliveirinha commented on KAFKA-9693:
-

[~junrao] thanks for the analysis and hypothesis. I didn't want to trigger the 
flush during the log append. In this case, I just configured 
log.flush.interval.ms=1000 and log.flush.scheduler.interval.ms=1000.  So, 
hopefully I got the same result as of using the sysctl commands. But even when 
I used the default settings in my first tests, I could see the spikes in 
latency.

I am using 64 partitions and a replication of 2 for the topic. Each broker has 
3 replica fetchers, so I assume that each fetch follower request to include at 
most (assuming that they are on the same broker) ~3 partitions.

Regarding the throttling on the replication: 1) how can i disable it. 2) I 
rarely saw those happening during the entire test but I also feel inclined to 
belive that it is something related to that, but that doesn't explain the 200ms 
since we are seeing throthling of 4ms unless they somehow add up to 200ms 
almost every time.

 

> Kafka latency spikes caused by log segment flush on roll
> 
>
> Key: KAFKA-9693
> URL: https://issues.apache.org/jira/browse/KAFKA-9693
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
> Environment: OS: Amazon Linux 2
> Kafka version: 2.2.1
>Reporter: Paolo Moriello
>Assignee: Paolo Moriello
>Priority: Major
>  Labels: Performance, latency, performance
> Attachments: image-2020-03-10-13-17-34-618.png, 
> image-2020-03-10-14-36-21-807.png, image-2020-03-10-15-00-23-020.png, 
> image-2020-03-10-15-00-54-204.png, image-2020-06-23-12-24-46-548.png, 
> image-2020-06-23-12-24-58-788.png, image-2020-06-26-13-43-21-723.png, 
> image-2020-06-26-13-46-52-861.png, image-2020-06-26-14-06-01-505.png, 
> latency_plot2.png
>
>
> h1. Summary
> When a log segment fills up, Kafka rolls over onto a new active segment and 
> force the flush of the old segment to disk. When this happens, log segment 
> _append_ duration increase causing important latency spikes on producer(s) 
> and replica(s). This ticket aims to highlight the problem and propose a 
> simple mitigation: add a new configuration to enable/disable rolled segment 
> flush.
> h1. 1. Phenomenon
> Response time of produce request (99th ~ 99.9th %ile) repeatedly spikes to 
> ~50x-200x more than usual. For instance, normally 99th %ile is lower than 
> 5ms, but when this issue occurs, it marks 100ms to 200ms. 99.9th and 99.99th 
> %iles even jump to 500-700ms.
> Latency spikes happen at constant frequency (depending on the input 
> throughput), for small amounts of time. All the producers experience a 
> latency increase at the same time.
> h1. !image-2020-03-10-13-17-34-618.png|width=942,height=314!
> {{Example of response time plot observed during on a single producer.}}
> URPs rarely appear in correspondence of the latency spikes too. This is 
> harder to reproduce, but from time to time it is possible to see a few 
> partitions going out of sync in correspondence of a spike.
> h1. 2. Experiment
> h2. 2.1 Setup
> Kafka cluster hosted on AWS EC2 instances.
> h4. Cluster
>  * 15 Kafka brokers: (EC2 m5.4xlarge)
>  ** Disk: 1100Gb EBS volumes (4750Mbps)
>  ** Network: 10 Gbps
>  ** CPU: 16 Intel Xeon Platinum 8000
>  ** Memory: 64Gb
>  * 3 Zookeeper nodes: m5.large
>  * 6 producers on 6 EC2 instances in the same region
>  * 1 topic, 90 partitions - replication factor=3
> h4. Broker config
> Relevant configurations:
> {quote}num.io.threads=8
>  num.replica.fetchers=2
>  offsets.topic.replication.factor=3
>  num.network.threads=5
>  num.recovery.threads.per.data.dir=2
>  min.insync.replicas=2
>  num.partitions=1
> {quote}
> h4. Perf Test
>  * Throughput ~6000-8000 (~40-70Mb/s input + replication = ~120-210Mb/s per 
> broker)
>  * record size = 2
>  * Acks = 1, linger.ms = 1, compression.type = none
>  * Test duration: ~20/30min
> h2. 2.2 Analysis
> Our analysis showed an high +correlation between log segment flush count/rate 
> and the latency spikes+. This indicates that the spikes in max latency are 
> related to Kafka behavior on rolling over new segments.
> The other metrics did not show any relevant impact on any hardware component 
> of the cluster, eg. cpu, memory, network traffic, disk throughput...
>  
>  !latency_plot2.png|width=924,height=308!
>  {{Correlation between latency spikes and log segment flush count. p50, p95, 
> p99, p999 and p latencies (left axis, ns) and the flush #count (right 
> axis, stepping blue line in plot).}}
> Kafka schedules logs flushing (this includes flushing the file record 
> containing log entries, the offset index, the timestamp index and the 
> transaction index) 

[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650355203


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650354756


   Hmm. Still saw the OOME in 
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3134/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


guozhangwang commented on pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#issuecomment-650354464


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446365992



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##
@@ -258,34 +263,43 @@ private void logValue(final Bytes key, final BufferKey 
bufferKey, final BufferVa
 final int sizeOfBufferTime = Long.BYTES;
 final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
 buffer.putLong(bufferKey.time());
-
+final byte[] array = buffer.array();
 ((RecordCollector.Supplier) context).recordCollector().send(
-changelogTopic,
-key,
-buffer.array(),
-V_2_CHANGELOG_HEADERS,
-partition,
-null,
-KEY_SERIALIZER,
-VALUE_SERIALIZER
+changelogTopic,
+key,
+array,
+CHANGELOG_HEADERS,
+partition,
+null,
+KEY_SERIALIZER,
+VALUE_SERIALIZER
 );
 }
 
 private void logTombstone(final Bytes key) {
 ((RecordCollector.Supplier) context).recordCollector().send(
-changelogTopic,
-key,
-null,
-null,
-partition,
-null,
-KEY_SERIALIZER,
-VALUE_SERIALIZER
+changelogTopic,
+key,
+null,
+null,

Review comment:
   I remember considering this when I added the first version header. The 
reason I didn't is that, since the initial version didn't have any headers, 
even if we change the tombstone format in the future, we'll always have to 
interpret a "no header, null value" record as being a "legacy format" 
tombstone, just like we have to interpret a "no header, non-null value" as 
being a "legacy format" data record.
   
   You can think of "no header" as indicating "version 0". Since we haven't 
changed the format of tombstones _yet_, there's no value in adding a "version 
1" flag. We should just wait until we do need to make such a change (if ever).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446364196



##
File path: 
streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static 
org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {

Review comment:
   That's correct.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446364364



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+final class TimeOrderedKeyValueBufferChangelogDeserializationHelper {
+private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {}
+
+static final class DeserializationResult {
+private final long time;
+private final Bytes key;
+private final BufferValue bufferValue;
+
+private DeserializationResult(final long time, final Bytes key, final 
BufferValue bufferValue) {
+this.time = time;
+this.key = key;
+this.bufferValue = bufferValue;
+}
+
+long time() {
+return time;
+}
+
+Bytes key() {
+return key;
+}
+
+BufferValue bufferValue() {
+return bufferValue;
+}
+}
+
+
+static DeserializationResult duckTypeV2(final ConsumerRecord record, final Bytes key) {
+DeserializationResult deserializationResult = null;
+RuntimeException v2DeserializationException = null;
+RuntimeException v3DeserializationException = null;
+try {
+deserializationResult = deserializeV2(record, key);
+} catch (final RuntimeException e) {
+v2DeserializationException = e;
+}
+// versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a 
V3 record with the
+// V2 header, so we'll try duck-typing to see if this is decodable as 
V3
+if (deserializationResult == null) {
+try {
+deserializationResult = deserializeV3(record, key);
+} catch (final RuntimeException e) {
+v3DeserializationException = e;
+}
+}
+
+if (deserializationResult == null) {
+// ok, it wasn't V3 either. Throw both exceptions:
+final RuntimeException exception =
+new RuntimeException("Couldn't deserialize record as v2 or v3: 
" + record,
+ v2DeserializationException);
+exception.addSuppressed(v3DeserializationException);
+throw exception;
+}
+return deserializationResult;
+}
+
+private static DeserializationResult deserializeV2(final 
ConsumerRecord record,

Review comment:
   sure thing!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446363798



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##
@@ -361,26 +366,20 @@ private void restoreBatch(final 
Collection> batch
 contextualRecord.recordContext()
 )
 );
-} else if 
(V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v")))
 {
-// in this case, the changelog value is a serialized 
BufferValue
+} else if (Arrays.equals(versionHeader.value(), 
V_2_CHANGELOG_HEADER_VALUE)) {
+
+final DeserializationResult deserializationResult = 
duckTypeV2(record, key);

Review comment:
   Sorry, the comments in `duckTypeV2`.
   
   Basically, because we released three versions that would write data in the 
"v3" format, but with the "v2" flag, when we see the v2 flag, the data might be 
in v2 format or v3 format. The only way to tell is to just try to deserialize 
it in v2 format, and if we get an exception, then to try with v3 format.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


guozhangwang commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446343541



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -189,8 +192,8 @@ def test_upgrade_downgrade_brokers(self, from_version, 
to_version):
 processor.stop()
 processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" 
% processor.STDOUT_FILE, allow_fail=False)
 
-@matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
-def test_simple_upgrade_downgrade(self, from_version, to_version):
+@matrix(from_version=smoke_test_versions, to_version=dev_version)

Review comment:
   +1, I think this is a great find.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##
@@ -361,26 +366,20 @@ private void restoreBatch(final 
Collection> batch
 contextualRecord.recordContext()
 )
 );
-} else if 
(V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v")))
 {
-// in this case, the changelog value is a serialized 
BufferValue
+} else if (Arrays.equals(versionHeader.value(), 
V_2_CHANGELOG_HEADER_VALUE)) {
+
+final DeserializationResult deserializationResult = 
duckTypeV2(record, key);

Review comment:
   Could you clarify which comment are you referring to? I did not see any 
comments for the "restoreBatch" method..

##
File path: 
streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static 
org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {

Review comment:
   I'm assuming 22..25 client / drive code are all copy-pastes here so I 
skipped reviewing them. LMK if they aren't.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+
+import java.nio.ByteBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+final class TimeOrderedKeyValueBufferChangelogDeserializationHelper {
+private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {}
+
+static final class DeserializationResult {
+private final long time;
+private final Bytes key;
+private final BufferValue bufferValue;
+
+private DeserializationResult(final long time, final Bytes key, final 
BufferValue bufferValue) {
+this.time = 

[GitHub] [kafka] ijuma commented on pull request #8579: KAFKA-9930: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.

2020-06-26 Thread GitBox


ijuma commented on pull request #8579:
URL: https://github.com/apache/kafka/pull/8579#issuecomment-650319283


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest - call KafkaStreams#cleanU…

2020-06-26 Thread GitBox


ableegoldman commented on pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#issuecomment-650316087


   The system tests I'm triggering keep failing, but not because of this PR. 
Not sure what's going on :/
   
   But I'll try again: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3990/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] efeg commented on a change in pull request #8579: KAFKA-9930: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.

2020-06-26 Thread GitBox


efeg commented on a change in pull request #8579:
URL: https://github.com/apache/kafka/pull/8579#discussion_r446327952



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -382,6 +382,12 @@ abstract class AbstractFetcherThread(name: String,
 "that the partition is being moved")
   partitionsWithError += topicPartition
 
+case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+  warn(s"Receiving ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from 
the leader for partition $topicPartition. " +
+   s"This can happen transiently if the partition is being 
created or deleted. " +
+   s"However, this is unexpected if it sustains.")

Review comment:
   @ijuma thanks for the update -- let me know if further changes needed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy closed pull request #8932: MINOR: rename "NOT_INITALIZED" to "NOT_INITIALIZED"

2020-06-26 Thread GitBox


omkreddy closed pull request #8932:
URL: https://github.com/apache/kafka/pull/8932


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] andrewchoi5 commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-06-26 Thread GitBox


andrewchoi5 commented on a change in pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#discussion_r446318540



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -499,7 +500,16 @@ class Partition(val topicPartition: TopicPartition,
 addingReplicas = addingReplicas,
 removingReplicas = removingReplicas
   )
-  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints)
+  try {
+this.createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints)
+  } catch {
+case e: ZooKeeperClientException =>
+  stateChangeLogger.info(s"A ZooKeeper client exception has occurred 
and makeLeader will be skipping the " +
+s"state change for the partition with leader epoch: $leaderEpoch 
", e)
+  error(s"ZooKeeper client error occurred while this partition was 
becoming the leader for $topicPartition.", e)

Review comment:
   Makes sense. Would appreciate a review on the change -- @junrao .





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-26 Thread GitBox


ableegoldman commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r446316588



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
##
@@ -124,18 +141,19 @@ public void before() {
 Serdes.String()
 );
 metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
-expect(context.metrics())
-.andReturn(new StreamsMetricsImpl(metrics, "test", 
builtInMetricsVersion)).anyTimes();
-expect(context.taskId()).andReturn(taskId).anyTimes();
-expect(inner.name()).andReturn("metered").anyTimes();
+expect(context.applicationId()).andStubReturn(APPLICATION_ID);
+expect(context.metrics()).andStubReturn(new 
StreamsMetricsImpl(metrics, "test", builtInMetricsVersion));
+expect(context.taskId()).andStubReturn(taskId);
+
expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);

Review comment:
   I think Bruno needs to give us all a short lesson on the correct usage 
of EasyMock. He explained the `StubReturn` thing to me on another PR a while 
back and I've been (slowly) trying to help migrate all the 
`.andReturn.anyTimes` usages over to this where appropriate (which is most 
places). It's definitely helped reduce the large number of EasyMock'ed tests 
that have to be fixed after every minor implementation change.
   
   That said, there does seem to be one critical difference between using 
`.andStubReturn` and the actual `MockInternalProcessorContext`. If we add a new 
method to the `InternalProcessorContext` interface, for example, we then have 
to add this expectation to every test that calls it with an EasyMock context. 
Having had to do this a number of times, it's definitely a huge timesuck.
   
   But I also agree that it doesn't need to be done as part of this PR. Maybe 
once the MockInternal and InternalMock processor contexts are finally merged we 
can do some reasonable cleanup of the tests





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-06-26 Thread GitBox


junrao commented on a change in pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#discussion_r446316668



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -499,7 +500,16 @@ class Partition(val topicPartition: TopicPartition,
 addingReplicas = addingReplicas,
 removingReplicas = removingReplicas
   )
-  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints)
+  try {
+this.createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints)
+  } catch {
+case e: ZooKeeperClientException =>
+  stateChangeLogger.info(s"A ZooKeeper client exception has occurred 
and makeLeader will be skipping the " +
+s"state change for the partition with leader epoch: $leaderEpoch 
", e)
+  error(s"ZooKeeper client error occurred while this partition was 
becoming the leader for $topicPartition.", e)

Review comment:
   I meant removing the line in 509 of error(s"ZooKeeper client .





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll

2020-06-26 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146527#comment-17146527
 ] 

Jun Rao commented on KAFKA-9693:


[~joliveirinha]: Thanks for the analysis. To trigger the flush during log 
append, did you set log.flush.interval.messages to 1? The high remote time in 
produce could be caused by (1) the processing for fetch follower request on the 
leader is long or (2) the follower is not issuing fetch request quick enough. 
Since fetchFollower time is not high, it doesn't seem (1) is the cause. (2) can 
be caused by follower log append delay. How many partitions did you use in the 
test? A fetch follower request can include multiple partitions. So, it can add 
up. (2) can also be caused by replication throttling. The non-zero follower 
throttling time typically indicates that replication throttling is engaged.

> Kafka latency spikes caused by log segment flush on roll
> 
>
> Key: KAFKA-9693
> URL: https://issues.apache.org/jira/browse/KAFKA-9693
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
> Environment: OS: Amazon Linux 2
> Kafka version: 2.2.1
>Reporter: Paolo Moriello
>Assignee: Paolo Moriello
>Priority: Major
>  Labels: Performance, latency, performance
> Attachments: image-2020-03-10-13-17-34-618.png, 
> image-2020-03-10-14-36-21-807.png, image-2020-03-10-15-00-23-020.png, 
> image-2020-03-10-15-00-54-204.png, image-2020-06-23-12-24-46-548.png, 
> image-2020-06-23-12-24-58-788.png, image-2020-06-26-13-43-21-723.png, 
> image-2020-06-26-13-46-52-861.png, image-2020-06-26-14-06-01-505.png, 
> latency_plot2.png
>
>
> h1. Summary
> When a log segment fills up, Kafka rolls over onto a new active segment and 
> force the flush of the old segment to disk. When this happens, log segment 
> _append_ duration increase causing important latency spikes on producer(s) 
> and replica(s). This ticket aims to highlight the problem and propose a 
> simple mitigation: add a new configuration to enable/disable rolled segment 
> flush.
> h1. 1. Phenomenon
> Response time of produce request (99th ~ 99.9th %ile) repeatedly spikes to 
> ~50x-200x more than usual. For instance, normally 99th %ile is lower than 
> 5ms, but when this issue occurs, it marks 100ms to 200ms. 99.9th and 99.99th 
> %iles even jump to 500-700ms.
> Latency spikes happen at constant frequency (depending on the input 
> throughput), for small amounts of time. All the producers experience a 
> latency increase at the same time.
> h1. !image-2020-03-10-13-17-34-618.png|width=942,height=314!
> {{Example of response time plot observed during on a single producer.}}
> URPs rarely appear in correspondence of the latency spikes too. This is 
> harder to reproduce, but from time to time it is possible to see a few 
> partitions going out of sync in correspondence of a spike.
> h1. 2. Experiment
> h2. 2.1 Setup
> Kafka cluster hosted on AWS EC2 instances.
> h4. Cluster
>  * 15 Kafka brokers: (EC2 m5.4xlarge)
>  ** Disk: 1100Gb EBS volumes (4750Mbps)
>  ** Network: 10 Gbps
>  ** CPU: 16 Intel Xeon Platinum 8000
>  ** Memory: 64Gb
>  * 3 Zookeeper nodes: m5.large
>  * 6 producers on 6 EC2 instances in the same region
>  * 1 topic, 90 partitions - replication factor=3
> h4. Broker config
> Relevant configurations:
> {quote}num.io.threads=8
>  num.replica.fetchers=2
>  offsets.topic.replication.factor=3
>  num.network.threads=5
>  num.recovery.threads.per.data.dir=2
>  min.insync.replicas=2
>  num.partitions=1
> {quote}
> h4. Perf Test
>  * Throughput ~6000-8000 (~40-70Mb/s input + replication = ~120-210Mb/s per 
> broker)
>  * record size = 2
>  * Acks = 1, linger.ms = 1, compression.type = none
>  * Test duration: ~20/30min
> h2. 2.2 Analysis
> Our analysis showed an high +correlation between log segment flush count/rate 
> and the latency spikes+. This indicates that the spikes in max latency are 
> related to Kafka behavior on rolling over new segments.
> The other metrics did not show any relevant impact on any hardware component 
> of the cluster, eg. cpu, memory, network traffic, disk throughput...
>  
>  !latency_plot2.png|width=924,height=308!
>  {{Correlation between latency spikes and log segment flush count. p50, p95, 
> p99, p999 and p latencies (left axis, ns) and the flush #count (right 
> axis, stepping blue line in plot).}}
> Kafka schedules logs flushing (this includes flushing the file record 
> containing log entries, the offset index, the timestamp index and the 
> transaction index) during _roll_ operations. A log is rolled over onto a new 
> empty log when:
>  * the log segment is full
>  * the maxtime has elapsed since the timestamp of first message in the 
> segment (or, in absence of it, since the create time)
>  * the index 

[GitHub] [kafka] ijuma commented on pull request #8931: MINOR: Update Scala to 2.13.3

2020-06-26 Thread GitBox


ijuma commented on pull request #8931:
URL: https://github.com/apache/kafka/pull/8931#issuecomment-650297987


   One job passed, two had a single unrelated test failure.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #8931: MINOR: Update Scala to 2.13.3

2020-06-26 Thread GitBox


ijuma merged pull request #8931:
URL: https://github.com/apache/kafka/pull/8931


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10166) Excessive TaskCorruptedException seen in testing

2020-06-26 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146501#comment-17146501
 ] 

Sophie Blee-Goldman commented on KAFKA-10166:
-

Thanks for the detailed analysis Bruno. In that case we are just waiting on the 
one PR, which should be mergeable sometime today

> Excessive TaskCorruptedException seen in testing
> 
>
> Key: KAFKA-10166
> URL: https://issues.apache.org/jira/browse/KAFKA-10166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As the title indicates, long-running test applications with injected network 
> "outages" seem to hit TaskCorruptedException more than expected.
> Seen occasionally on the ALOS application (~20 times in two days in one case, 
> for example), and very frequently with EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] andrewchoi5 commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-06-26 Thread GitBox


andrewchoi5 commented on a change in pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#discussion_r446309532



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -499,7 +500,16 @@ class Partition(val topicPartition: TopicPartition,
 addingReplicas = addingReplicas,
 removingReplicas = removingReplicas
   )
-  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints)
+  try {
+this.createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints)
+  } catch {
+case e: ZooKeeperClientException =>
+  stateChangeLogger.info(s"A ZooKeeper client exception has occurred 
and makeLeader will be skipping the " +
+s"state change for the partition with leader epoch: $leaderEpoch 
", e)
+  error(s"ZooKeeper client error occurred while this partition was 
becoming the leader for $topicPartition.", e)

Review comment:
   Thanks @junrao -- I will make the logging level to error. 
   Could you clarify what you mean by keep the state change logging?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

2020-06-26 Thread GitBox


ableegoldman commented on a change in pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#discussion_r446308395



##
File path: gradle/spotbugs-exclude.xml
##
@@ -348,14 +348,11 @@ For a detailed description of spotbugs bug categories, 
see https://spotbugs.read
 
 
 
-
-
-
+
 
-
-
-
-
+
+

Review comment:
   Fair enough. I eagerly await your alternative proposal :P 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-06-26 Thread GitBox


junrao commented on a change in pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#discussion_r446306280



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -499,7 +500,16 @@ class Partition(val topicPartition: TopicPartition,
 addingReplicas = addingReplicas,
 removingReplicas = removingReplicas
   )
-  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints)
+  try {
+this.createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints)
+  } catch {
+case e: ZooKeeperClientException =>
+  stateChangeLogger.info(s"A ZooKeeper client exception has occurred 
and makeLeader will be skipping the " +
+s"state change for the partition with leader epoch: $leaderEpoch 
", e)
+  error(s"ZooKeeper client error occurred while this partition was 
becoming the leader for $topicPartition.", e)

Review comment:
   We probably can just keep the state change logging. Also, the logging 
level probably should be error instead of info. Ditto below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task

2020-06-26 Thread GitBox


vvcephei commented on pull request #8926:
URL: https://github.com/apache/kafka/pull/8926#issuecomment-650288339


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task

2020-06-26 Thread GitBox


vvcephei commented on pull request #8926:
URL: https://github.com/apache/kafka/pull/8926#issuecomment-650288465


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-26 Thread GitBox


abbccdda commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-650275504


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8914: MINOR: Do not swallow exception when collecting PIDs

2020-06-26 Thread GitBox


abbccdda commented on pull request #8914:
URL: https://github.com/apache/kafka/pull/8914#issuecomment-650275130


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

2020-06-26 Thread GitBox


vvcephei commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-650270782







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams

2020-06-26 Thread GitBox


vvcephei commented on pull request #8929:
URL: https://github.com/apache/kafka/pull/8929#issuecomment-650271007


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8905:
URL: https://github.com/apache/kafka/pull/8905#discussion_r446265559



##
File path: build.gradle
##
@@ -97,7 +97,7 @@ ext {
   buildVersionFileName = "kafka-version.properties"
 
   defaultMaxHeapSize = "2g"
-  defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC"]
+  defaultJvmArgs = ["-Xss4m"]

Review comment:
   @ijuma , you'll probably want to know about this.
   
   I have no idea why, but one of the new tests in this PR was failing with:
   ```
   java.lang.OutOfMemoryError: Java heap space
   at 
org.apache.kafka.streams.kstream.internals.FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(FullChangeSerde.java:82)
   at 
org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:90)
   at 
org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:61)
   at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:369)
   at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$284/0x0001002cb440.restoreBatch(Unknown
 Source)
   at 
org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest.shouldRestoreV3FormatWithV2Header(TimeOrderedKeyValueBufferTest.java:742)
   ```
   
   I captured a flight recording and a heap dump on exit, but everything looked 
fine, and the heap was only a few megs at the time of the crash. I noticed 
first that if I just overrode all the jvm args, the test would pass, and 
through trial and error, I identified this one as the "cause".
   
   I get an OOMe every time with `-XX:+UseParallelGC` and I've never gotten it 
without the flag. WDYT about dropping it?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
##
@@ -56,14 +55,13 @@
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.CHANGELOG_HEADERS;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 public class TimeOrderedKeyValueBufferTest> {
-private static final RecordHeaders V_2_CHANGELOG_HEADERS =
-new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] 
{(byte) 2})});

Review comment:
   imported the headers from the production code, so that it'll stay 
current.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
##
@@ -104,10 +104,6 @@ public void shouldWorkWithRebalance() throws 
InterruptedException {
 clients.add(smokeTestClient);
 smokeTestClient.start(props);
 
-while (!clients.get(clients.size() - 1).started()) {
-Thread.sleep(100);
-}
-

Review comment:
   Don't need this anymore because `start` blocks until it's "started" now.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##
@@ -38,107 +37,128 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.TestUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
 private final String name;
 
-private Thread thread;
 private KafkaStreams streams;
 private boolean uncaughtException = false;
-private boolean started;
-private boolean closed;
+private volatile boolean closed;
 
-public SmokeTestClient(final String name) {
-super();
-this.name = name;
+private static void addShutdownHook(final String name, final Runnable 
runnable) {
+if (name != null) {
+Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, 
runnable));
+} else {
+Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+}
 }
 
-public boolean started() {
-return started;
+private static File tempDirectory() {
+final String prefix = "kafka-";
+final File file;
+try {
+file = 

[jira] [Commented] (KAFKA-10206) Admin can transiently return incorrect results about topics

2020-06-26 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146444#comment-17146444
 ] 

Tom Bentley commented on KAFKA-10206:
-

I think the broker needs some alternative to replying with invalid data until 
it has received topic data from the controller.
There are no meaningful retriable errors which the broker could return (and 
older clients would not expect these).
The broker could delay responding to a metadata request until it had received 
topic data from the controller. That's complicated by the fact that the initial 
UPDATE_METADATA request from the controller lacks topic data. While using a 
counter would work most of the time, it is not safe if the controller didn't 
sent the 2nd UPDATE_METADATA request (e.g. due to controller failover). An 
alternative to using a counter would be to distinguish in the UPDATE_METADATA 
request between an empty topic list and a null topic list.

Then there's the question of how long the broker should wait before responding. 
The alternative to waiting would be to return some new retriable error code to 
the client (which could then try another broker).

[~ijuma], [~cmccabe] do you have any better ideas about how best to address 
this?

> Admin can transiently return incorrect results about topics
> ---
>
> Key: KAFKA-10206
> URL: https://issues.apache.org/jira/browse/KAFKA-10206
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Major
>
> When a broker starts up it can handle metadata requests before it has 
> received UPDATE_METADATA requests from the controller. 
> This manifests in the admin client via:
> * listTopics returning an empty list
> * describeTopics and describeConfigs of topics erroneously returning 
> TopicOrPartitionNotFoundException
> I assume this also affects the producer and consumer, though since 
> `UnknownTopicOrPartitionException` is retriable those clients recover.
> Testing locally suggests that the window for this happening is typically <1s.
> There doesn't seem to be any way for the caller of the Admin client to detect 
> this situation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10206) Admin can transiently return incorrect results about topics

2020-06-26 Thread Tom Bentley (Jira)
Tom Bentley created KAFKA-10206:
---

 Summary: Admin can transiently return incorrect results about 
topics
 Key: KAFKA-10206
 URL: https://issues.apache.org/jira/browse/KAFKA-10206
 Project: Kafka
  Issue Type: Bug
  Components: admin, core
Reporter: Tom Bentley
Assignee: Tom Bentley


When a broker starts up it can handle metadata requests before it has 
received UPDATE_METADATA requests from the controller. 
This manifests in the admin client via:

* listTopics returning an empty list
* describeTopics and describeConfigs of topics erroneously returning 
TopicOrPartitionNotFoundException

I assume this also affects the producer and consumer, though since 
`UnknownTopicOrPartitionException` is retriable those clients recover.

Testing locally suggests that the window for this happening is typically <1s.

There doesn't seem to be any way for the caller of the Admin client to detect 
this situation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task

2020-06-26 Thread GitBox


guozhangwang commented on pull request #8926:
URL: https://github.com/apache/kafka/pull/8926#issuecomment-650261830


   > Seems like all the Topology testDriver tests failed, but I got a green 
build running locally. Do they not run with `./gradlew streams:test`?
   
   They are included in streams:test. Maybe try to rebase the branch and see if 
there's any missing committs?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task

2020-06-26 Thread GitBox


ableegoldman commented on pull request #8926:
URL: https://github.com/apache/kafka/pull/8926#issuecomment-650251400


   Seems like all the Topology testDriver tests failed, but I got a green build 
running locally. Do they not run with `./gradlew streams:test`?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8927: KAFKA-10200: Fix testability of PAPI with windowed stores

2020-06-26 Thread GitBox


vvcephei commented on a change in pull request #8927:
URL: https://github.com/apache/kafka/pull/8927#discussion_r446263222



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+
+public final class ProcessorContextUtils {

Review comment:
   Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >