[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146776#comment-17146776 ] Sean Guo commented on KAFKA-10134: -- We can try the patch next week to see whether this also addresses the issue we met as we had also seen a lots of operations on finding the coordinator when this issue happens. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8925: KAFKA-9974: Make produce-sync flush
guozhangwang commented on pull request #8925: URL: https://github.com/apache/kafka/pull/8925#issuecomment-650487412 > Could we include the purpose of this PR in the title, such as "Integration test shouldApplyUpdatesToStandbyStore fix:... " SG This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8925: KAFKA-9974: Make produce-sync flush
abbccdda commented on pull request #8925: URL: https://github.com/apache/kafka/pull/8925#issuecomment-650486850 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on pull request #8905: URL: https://github.com/apache/kafka/pull/8905#issuecomment-650483907 backported to 2.6, 2.5, and 2.4. I ran the streams and client tests each time, as well as systemTestLibs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10173) BufferUnderflowException during Kafka Streams Upgrade
[ https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146758#comment-17146758 ] John Roesler commented on KAFKA-10173: -- Ok, the fix is now backported all the way back to 2.4. I'll leave this ticket open until I get the system test PR in. > BufferUnderflowException during Kafka Streams Upgrade > - > > Key: KAFKA-10173 > URL: https://issues.apache.org/jira/browse/KAFKA-10173 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Karsten Schnitter >Assignee: John Roesler >Priority: Blocker > Labels: suppress > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I > followed the steps described in the upgrade guide and set the property > {{migrate.from=2.3}}. On my dev system with just one running instance I got > the following exception: > {noformat} > stream-thread [0-StreamThread-2] Encountered the following error during > processing: > java.nio.BufferUnderflowException: null > at java.base/java.nio.HeapByteBuffer.get(Unknown Source) > at java.base/java.nio.ByteBuffer.get(Unknown Source) > at > org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94) > at > org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83) > at > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368) > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) > at > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {noformat} > I figured out, that this problem only occurs for stores, where I use the > suppress feature. If I rename the changelog topics during the migration, the > problem will not occur. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10207) Untrimmed Index files cause premature log segment deletions on startup
[ https://issues.apache.org/jira/browse/KAFKA-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146757#comment-17146757 ] Johnny Malizia commented on KAFKA-10207: I didn't see where to assign this to me, but I went ahead and submitted a change to handle this scenario more gracefully. [https://github.com/apache/kafka/pull/8936] With that said, it does seem to go directly against part of the benefits of KIP-263, so I am open to alternatives. > Untrimmed Index files cause premature log segment deletions on startup > -- > > Key: KAFKA-10207 > URL: https://issues.apache.org/jira/browse/KAFKA-10207 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 2.4.0, 2.3.1, 2.4.1 >Reporter: Johnny Malizia >Priority: Major > > [KIP-263|https://cwiki.apache.org/confluence/display/KAFKA/KIP-263%3A+Allow+broker+to+skip+sanity+check+of+inactive+segments+on+broker+startup#KIP263:Allowbrokertoskipsanitycheckofinactivesegmentsonbrokerstartup-Evaluation] > appears to have introduced a change explicitly deciding to not call the > sanityCheck method on the time or offset index files that are loaded by Kafka > at startup. I found a particularly nasty bug using the following configuration > {code:java} > jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws kafka=2.4.1{code} > The bug was that the retention period for a topic or even the broker level > configuration seemed to not be respected, no matter what, when the broker > started up it would decide that all log segments on disk were breaching the > retention window and the data would be purged away. > > {code:java} > Found deletable segments with base offsets [11610665,12130396,12650133] due > to retention time 8640ms breach {code} > {code:java} > Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log) > Scheduling segments for deletion List(LogSegment(baseOffset=11610665, > size=1073731621, lastModifiedTime=1592532125000, largestTime=0), > LogSegment(baseOffset=12130396, size=1073727967, > lastModifiedTime=1592532462000, largestTime=0), > LogSegment(baseOffset=12650133, size=235891971, > lastModifiedTime=1592532531000, largestTime=0)) {code} > Further logging showed that this issue was happening when loading the files, > indicating the final writes to trim the index were not successful > {code:java} > DEBUG Loaded index file > /mnt/kafka-logs/test_topic-0/17221277.timeindex with maxEntries = > 873813, maxIndexSize = 10485760, entries = 873813, lastOffset = > TimestampOffset(0,17221277), file position = 10485756 > (kafka.log.TimeIndex){code} > It looks like the initially file is preallocated (10MB by default) and index > entries are added over time. When it's time to roll to a new log segment, the > index file is supposed to be trimmed, removing any 0 bytes left at the tail > from the initial allocation. But, in some cases that doesn't seem to happen > successfully. Because 0 bytes at the tail may not have been removed, when the > index is loaded again after restarting Kafka, the buffer seeks the position > to the end and the next timestamp is 0 and this leads to a premature TTL > deletion of the log segments. > > I tracked the issue down to being caused by the jvm version being used as > upgrading resolved this issue, but I think that Kafka should never delete > data by mistake like this as doing a rolling restart with this bug in place > would cause complete data-loss across the cluster. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] Johnny-Malizia opened a new pull request #8936: Fixed padded timeindex causing premature data deletion
Johnny-Malizia opened a new pull request #8936: URL: https://github.com/apache/kafka/pull/8936 In some cases when a new log segment is rolled, the previous segment's timeindex and offset index do not have the excess 0 bytes trimmed. This results in a situation where the next load from disk triggers retention window breaches and the data is deleted. This happens because the mmap pointing to the index data seeks to the end and because there are 0 bytes padding the end in some cases, the loaded timestamp is 0. The sanity checks were previously removed explicitly by KIP-263. A unittest was added to confirm the sanity check on timeindex will in fact catch this scenario. There doesn't seem to be existing testing for LogSegment.sanityCheck() so I opted to test the specific functionality and rely on the same logic that TransactionIndex currently relies on. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10207) Untrimmed Index files cause premature log segment deletions on startup
[ https://issues.apache.org/jira/browse/KAFKA-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnny Malizia updated KAFKA-10207: --- Description: [KIP-263|https://cwiki.apache.org/confluence/display/KAFKA/KIP-263%3A+Allow+broker+to+skip+sanity+check+of+inactive+segments+on+broker+startup#KIP263:Allowbrokertoskipsanitycheckofinactivesegmentsonbrokerstartup-Evaluation] appears to have introduced a change explicitly deciding to not call the sanityCheck method on the time or offset index files that are loaded by Kafka at startup. I found a particularly nasty bug using the following configuration {code:java} jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws kafka=2.4.1{code} The bug was that the retention period for a topic or even the broker level configuration seemed to not be respected, no matter what, when the broker started up it would decide that all log segments on disk were breaching the retention window and the data would be purged away. {code:java} Found deletable segments with base offsets [11610665,12130396,12650133] due to retention time 8640ms breach {code} {code:java} Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log) Scheduling segments for deletion List(LogSegment(baseOffset=11610665, size=1073731621, lastModifiedTime=1592532125000, largestTime=0), LogSegment(baseOffset=12130396, size=1073727967, lastModifiedTime=1592532462000, largestTime=0), LogSegment(baseOffset=12650133, size=235891971, lastModifiedTime=1592532531000, largestTime=0)) {code} Further logging showed that this issue was happening when loading the files, indicating the final writes to trim the index were not successful {code:java} DEBUG Loaded index file /mnt/kafka-logs/test_topic-0/17221277.timeindex with maxEntries = 873813, maxIndexSize = 10485760, entries = 873813, lastOffset = TimestampOffset(0,17221277), file position = 10485756 (kafka.log.TimeIndex){code} It looks like the initially file is preallocated (10MB by default) and index entries are added over time. When it's time to roll to a new log segment, the index file is supposed to be trimmed, removing any 0 bytes left at the tail from the initial allocation. But, in some cases that doesn't seem to happen successfully. Because 0 bytes at the tail may not have been removed, when the index is loaded again after restarting Kafka, the buffer seeks the position to the end and the next timestamp is 0 and this leads to a premature TTL deletion of the log segments. I tracked the issue down to being caused by the jvm version being used as upgrading resolved this issue, but I think that Kafka should never delete data by mistake like this as doing a rolling restart with this bug in place would cause complete data-loss across the cluster. was: [KIP-263|https://cwiki.apache.org/confluence/display/KAFKA/KIP-263%3A+Allow+broker+to+skip+sanity+check+of+inactive+segments+on+broker+startup#KIP263:Allowbrokertoskipsanitycheckofinactivesegmentsonbrokerstartup-Evaluation] appears to have introduced a change explicitly deciding to not call the sanityCheck method on the time or offset index files that are loaded by Kafka at startup. I found a particularly nasty bug using the following configuration {code:java} jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws kafka=2.4.1{code} The bug was that the retention period for a topic or even the broker level configuration seemed to not be respected, no matter what, when the broker started up it would decide that all log segments on disk were breaching the retention window and the data would be purged away. {code:java} Found deletable segments with base offsets [11610665,12130396,12650133] due to retention time 8640ms breach {code} {code:java} Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log) Scheduling segments for deletion List(LogSegment(baseOffset=11610665, size=1073731621, lastModifiedTime=1592532125000, largestTime=0), LogSegment(baseOffset=12130396, size=1073727967, lastModifiedTime=1592532462000, largestTime=0), LogSegment(baseOffset=12650133, size=235891971, lastModifiedTime=1592532531000, largestTime=0)) {code} Further logging showed that this issue was happening when loading the files, indicating the final writes to trim the index were not successful {code:java} DEBUG Loaded index file /mnt/kafka-logs/test_topic-0/17221277.timeindex with maxEntries = 873813, maxIndexSize = 10485760, entries = 873813, lastOffset = TimestampOffset(0,17221277), file position = 10485756 (kafka.log.TimeIndex){code} So, because the index leaves the preallocated 0 bytes at the tail, when the index is loaded again after restarting Kafka, the next timestamp is 0 and this leads to a premature TTL deletion of the log segments. I tracked the issue down to being caused by the jvm version being used as upgrading resolved this issue, but I think that Kafka should never
[jira] [Commented] (KAFKA-10173) BufferUnderflowException during Kafka Streams Upgrade
[ https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146749#comment-17146749 ] John Roesler commented on KAFKA-10173: -- Ok, I've just merged the fix ([https://github.com/apache/kafka/pull/8905] ), and I'm backporting it now. To control the scope of the change, I've separated the upgrade system tests out for a second PR, which I'll submit next week. I did run them locally, and they pass after #8905, so I have confidence the bug is really fixed. > BufferUnderflowException during Kafka Streams Upgrade > - > > Key: KAFKA-10173 > URL: https://issues.apache.org/jira/browse/KAFKA-10173 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Karsten Schnitter >Assignee: John Roesler >Priority: Blocker > Labels: suppress > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I > followed the steps described in the upgrade guide and set the property > {{migrate.from=2.3}}. On my dev system with just one running instance I got > the following exception: > {noformat} > stream-thread [0-StreamThread-2] Encountered the following error during > processing: > java.nio.BufferUnderflowException: null > at java.base/java.nio.HeapByteBuffer.get(Unknown Source) > at java.base/java.nio.ByteBuffer.get(Unknown Source) > at > org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94) > at > org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83) > at > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368) > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) > at > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {noformat} > I figured out, that this problem only occurs for stores, where I use the > suppress feature. If I rename the changelog topics during the migration, the > problem will not occur. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146747#comment-17146747 ] John Roesler commented on KAFKA-8630: - Hey, all, I inadvertently started a new ticket for this KAFKA-10200, and submitted a patch already. Sorry if I stepped on any toes. > Unit testing a streams processor with a WindowStore throws a > ClassCastException > --- > > Key: KAFKA-8630 > URL: https://issues.apache.org/jira/browse/KAFKA-8630 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 2.3.0 >Reporter: Justin Fetherolf >Assignee: John Roesler >Priority: Critical > Fix For: 2.7.0, 2.6.1 > > > I was attempting to write a unit test for a class implementing the > {{Processor}} interface that contained a {{WindowStore}}, but running the > test fails with a {{ClassCastException}} coming out of > {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to > {{InternalProcessorContext}}. > Minimal code to reproduce: > {code:java} > package com.cantgetthistowork; > import org.apache.kafka.streams.processor.Processor; > import org.apache.kafka.streams.processor.ProcessorContext; > import org.apache.kafka.streams.state.WindowStore; > public class InMemWindowProcessor implements Processor { > private ProcessorContext context; > private WindowStore windowStore; > @Override > public void init(ProcessorContext context) { > this.context = context; > windowStore = (WindowStore) > context.getStateStore("my-win-store"); > } > @Override > public void process(String key, String value) { > } > @Override > public void close() { > } > } > {code} > {code:java} > package com.cantgetthistowork; > import java.time.Duration; > import java.time.Instant; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.processor.MockProcessorContext; > import org.apache.kafka.streams.state.Stores; > import org.apache.kafka.streams.state.WindowStore; > import org.junit.Before; > import org.junit.Test; > public class InMemWindowProcessorTest { > InMemWindowProcessor processor = null; > MockProcessorContext context = null; > @Before > public void setup() { > processor = new InMemWindowProcessor(); > context = new MockProcessorContext(); > WindowStore store = > Stores.windowStoreBuilder( > Stores.inMemoryWindowStore( > "my-win-store", > Duration.ofMinutes(10), > Duration.ofSeconds(10), > false > ), > Serdes.String(), > Serdes.String() > ) > .withLoggingDisabled() > .build(); > store.init(context, store); > context.register(store, null); > processor.init(context); > } > @Test > public void testThings() { > Instant baseTime = Instant.now(); > context.setTimestamp(baseTime.toEpochMilli()); > context.setTopic("topic-name"); > processor.process("key1", "value1"); > } > } > {code} > > I was trying this with maven, with mvn --version outputting: > {noformat} > Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; > 2017-04-03T13:39:06-06:00) > Maven home: ~/opt/apache-maven-3.5.0 > Java version: 1.8.0_212, vendor: Oracle Corporation > Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: > "unix"{noformat} > And finally the stack trace: > {noformat} > --- > T E S T S > --- > Running com.cantgetthistowork.InMemWindowProcessorTest > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< > FAILURE! > testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: > 0.05 sec <<< ERROR! > java.lang.ClassCastException: > org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to > org.apache.kafka.streams.processor.internals.InternalProcessorContext > at > org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90) > at > com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
[jira] [Assigned] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-8630: --- Assignee: John Roesler > Unit testing a streams processor with a WindowStore throws a > ClassCastException > --- > > Key: KAFKA-8630 > URL: https://issues.apache.org/jira/browse/KAFKA-8630 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 2.3.0 >Reporter: Justin Fetherolf >Assignee: John Roesler >Priority: Critical > Fix For: 2.7.0, 2.6.1 > > > I was attempting to write a unit test for a class implementing the > {{Processor}} interface that contained a {{WindowStore}}, but running the > test fails with a {{ClassCastException}} coming out of > {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to > {{InternalProcessorContext}}. > Minimal code to reproduce: > {code:java} > package com.cantgetthistowork; > import org.apache.kafka.streams.processor.Processor; > import org.apache.kafka.streams.processor.ProcessorContext; > import org.apache.kafka.streams.state.WindowStore; > public class InMemWindowProcessor implements Processor { > private ProcessorContext context; > private WindowStore windowStore; > @Override > public void init(ProcessorContext context) { > this.context = context; > windowStore = (WindowStore) > context.getStateStore("my-win-store"); > } > @Override > public void process(String key, String value) { > } > @Override > public void close() { > } > } > {code} > {code:java} > package com.cantgetthistowork; > import java.time.Duration; > import java.time.Instant; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.processor.MockProcessorContext; > import org.apache.kafka.streams.state.Stores; > import org.apache.kafka.streams.state.WindowStore; > import org.junit.Before; > import org.junit.Test; > public class InMemWindowProcessorTest { > InMemWindowProcessor processor = null; > MockProcessorContext context = null; > @Before > public void setup() { > processor = new InMemWindowProcessor(); > context = new MockProcessorContext(); > WindowStore store = > Stores.windowStoreBuilder( > Stores.inMemoryWindowStore( > "my-win-store", > Duration.ofMinutes(10), > Duration.ofSeconds(10), > false > ), > Serdes.String(), > Serdes.String() > ) > .withLoggingDisabled() > .build(); > store.init(context, store); > context.register(store, null); > processor.init(context); > } > @Test > public void testThings() { > Instant baseTime = Instant.now(); > context.setTimestamp(baseTime.toEpochMilli()); > context.setTopic("topic-name"); > processor.process("key1", "value1"); > } > } > {code} > > I was trying this with maven, with mvn --version outputting: > {noformat} > Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; > 2017-04-03T13:39:06-06:00) > Maven home: ~/opt/apache-maven-3.5.0 > Java version: 1.8.0_212, vendor: Oracle Corporation > Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: > "unix"{noformat} > And finally the stack trace: > {noformat} > --- > T E S T S > --- > Running com.cantgetthistowork.InMemWindowProcessorTest > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< > FAILURE! > testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: > 0.05 sec <<< ERROR! > java.lang.ClassCastException: > org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to > org.apache.kafka.streams.processor.internals.InternalProcessorContext > at > org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90) > at > com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at >
[GitHub] [kafka] vvcephei merged pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei merged pull request #8905: URL: https://github.com/apache/kafka/pull/8905 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on pull request #8905: URL: https://github.com/apache/kafka/pull/8905#issuecomment-650476597 Failures were unrelated: ``` kafka.api.PlaintextConsumerTest > testLowMaxFetchSizeForRequestAndPartition FAILED org.scalatest.exceptions.TestFailedException: Timed out before consuming expected 2700 records. The number consumed was 1077. at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:158) at kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition(PlaintextConsumerTest.scala:804) ``` ``` org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testReplication FAILED java.lang.RuntimeException: Could not find enough records. found 0, expected 100 at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435) at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:217) ``` ``` kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition ``` ``` org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores ``` ``` org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10185) Streams should log summarized restoration information at info level
[ https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10185. -- Resolution: Fixed > Streams should log summarized restoration information at info level > --- > > Key: KAFKA-10185 > URL: https://issues.apache.org/jira/browse/KAFKA-10185 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.6.0, 2.5.1 > > > Currently, restoration progress is only visible at debug level in the > Consumer's Fetcher logs. Users can register a restoration listener and > implement their own logging, but it would substantially improve operability > to have some logs available at INFO level. > Logging each partition in each restore batch at info level would be too much, > though, so we should print summarized logs at a decreased interval, like > every 10 seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10185) Streams should log summarized restoration information at info level
[ https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146739#comment-17146739 ] John Roesler commented on KAFKA-10185: -- Hey [~rhauch] , Sorry, I should have emailed the list when I added this. The fix for this is already merged in the 2.6 branch, so I think a fix version of 2.6.0 is correct. I just neglected to actually close the ticket. I'm sorry, I know this didn't make your job any easier. Thanks, -John > Streams should log summarized restoration information at info level > --- > > Key: KAFKA-10185 > URL: https://issues.apache.org/jira/browse/KAFKA-10185 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.6.0, 2.5.1 > > > Currently, restoration progress is only visible at debug level in the > Consumer's Fetcher logs. Users can register a restoration listener and > implement their own logging, but it would substantially improve operability > to have some logs available at INFO level. > Logging each partition in each restore batch at info level would be too much, > though, so we should print summarized logs at a decreased interval, like > every 10 seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10185) Streams should log summarized restoration information at info level
[ https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10185: - Fix Version/s: (was: 2.7.0) 2.5.1 2.6.0 > Streams should log summarized restoration information at info level > --- > > Key: KAFKA-10185 > URL: https://issues.apache.org/jira/browse/KAFKA-10185 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.6.0, 2.5.1 > > > Currently, restoration progress is only visible at debug level in the > Consumer's Fetcher logs. Users can register a restoration listener and > implement their own logging, but it would substantially improve operability > to have some logs available at INFO level. > Logging each partition in each restore batch at info level would be too much, > though, so we should print summarized logs at a decreased interval, like > every 10 seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8927: KAFKA-10200: Fix testability of PAPI with windowed stores
vvcephei commented on a change in pull request #8927: URL: https://github.com/apache/kafka/pull/8927#discussion_r446473287 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; + +public final class ProcessorContextUtils { Review comment: Uh, no, that was an oversight :) I might as well fix it in this PR, so I'll not merge just now. Thanks for pointing it out. I think where it leaves us, in a nutshell, is that state stores should never cast the context, not as long as their `init()` method is a public interface. This class is a way to collect all the casting-related sins and put them all in one place where we can keep an eye on them. Casting doesn't always indicate a design failure, but in this case, it 100% does. I'd hope that everything in this class gets designed away and we can delete the class. I'd flip the table and do it right now, but I don't want to block the unit-testability of the PAPI behind a bunch of KIPs. I'll make tickets to fix the stuff that needs to be fixed. In contrast, note that all the Processor implementations that power our DSL are very much _not_ for public use, so for them, casting to InternalProcessorContext is fair game. Although, I _have_ found and/or fixed a fair number of bugs following from casting in the processors as well. So, although, it's not a contract violation, it still may not be a good idea. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8914: MINOR: Do not swallow exception when collecting PIDs
vvcephei commented on pull request #8914: URL: https://github.com/apache/kafka/pull/8914#issuecomment-650474359 Thanks for running the tests. One test failed. I haven't looked to see if it's important: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-26--001.1593186338--cadonna--donot_swallow_exeption_when_retrieving_pids--911b7a2f0/report.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r446472130 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; + +public abstract class AbstractStateManager implements StateManager { Review comment: Thanks! This seems like the kind of debate that will continue for a while :) Just to share the thought, adding abstraction isn't the only way to de-duplicate code. In fact composition is generally less costly to maintain, for example adding utility methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r446471839 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -103,8 +104,13 @@ public void init(final ProcessorContext context, @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { +final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; +final String storeName = name(); +final String changelogTopic = internalProcessorContext.changelogFor(storeName); serdes = new StateSerdes<>( -ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), +changelogTopic != null ? +changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), Review comment: No problem! I'm glad it proved useful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r446471736 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java ## @@ -124,18 +141,19 @@ public void before() { Serdes.String() ); metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); -expect(context.metrics()) -.andReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)).anyTimes(); -expect(context.taskId()).andReturn(taskId).anyTimes(); -expect(inner.name()).andReturn("metered").anyTimes(); +expect(context.applicationId()).andStubReturn(APPLICATION_ID); +expect(context.metrics()).andStubReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)); +expect(context.taskId()).andStubReturn(taskId); + expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC); Review comment: Thanks, all. What Sophie mentioned is more in line with what I had in mind. The advantage of something like "MIPC" is that it just generally satisfies the API contract while not really doing stuff like writing to the brokers, etc. Because it's a contract-valid implementation of InternalProcessorContext, we can use it transparently to support any use case where we really don't care exactly what calls the unit under test makes into the context. EasyMock is very powerful, but it can't magically implement a complex interface contract, so it can never fill this gap. On the other hand, if there's really only going to be one or two calls, then it's fine. And, of course, if we _need_ to verify interactions, exceptions, etc., then EasyMock gives us a very powerful set of utilities for it. Anyway, we don't have to change this right now, but I've certainly spent many hours fiddling around with easymock expectations just like this, and it provides no value if you just want the thing to behave the way that MIPC already behaves. Which is why I pointed it out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r446126644 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -125,35 +131,37 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics); -for (final Map.Entry> createTopicResult : createTopicsResult.values().entrySet()) { -final String topicName = createTopicResult.getKey(); -try { -createTopicResult.getValue().get(); -topicsNotReady.remove(topicName); -} catch (final InterruptedException fatalException) { -// this should not happen; if it ever happens it indicate a bug -Thread.currentThread().interrupt(); -log.error(INTERRUPTED_ERROR_MESSAGE, fatalException); -throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException); -} catch (final ExecutionException executionException) { -final Throwable cause = executionException.getCause(); -if (cause instanceof TopicExistsException) { -// This topic didn't exist earlier or its leader not known before; just retain it for next round of validation. -log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" + -"Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" + -"Error message was: {}", topicName, retryBackOffMs, cause.toString()); -} else { -log.error("Unexpected error during topic creation for {}.\n" + -"Error message was: {}", topicName, cause.toString()); -throw new StreamsException(String.format("Could not create topic %s.", topicName), cause); +if (createTopicsResult != null) { +for (final Map.Entry> createTopicResult : createTopicsResult.values().entrySet()) { Review comment: Need the null check for `createTopicsResult` since the `newTopics` might be empty This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146724#comment-17146724 ] Neo Wu commented on KAFKA-10134: something like this fix my issues, but i am not sure whether this is right thing to do to fit bigger picture {code:java} // poll for new data until the timeout expires Map>> records = null; do { client.maybeTriggerWakeup(); if (includeMetadataInTimeout) { // try to update assignment metadata BUT do not need to block on the timer if we still have // some assigned partitions, since even if we are 1) in the middle of a rebalance // or 2) have partitions with unknown starting positions we may still want to return some data // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms // to never block on completing the rebalance procedure if there's any if (subscriptions.fetchablePartitions(tp -> true).isEmpty() || records == null || records.isEmpty()) { updateAssignmentMetadataIfNeeded(timer); } else { final Timer updateMetadataTimer = time.timer(0L); updateAssignmentMetadataIfNeeded(updateMetadataTimer); timer.update(updateMetadataTimer.currentTimeMs()); } } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn("Still waiting for metadata"); } } records = pollForFetches(timer); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.transmitSends(); } return this.interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired()); {code} > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146723#comment-17146723 ] Neo Wu commented on KAFKA-10134: maybe besides fetchablePartitions, it also should check records = pollForFetches(timer)? > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146721#comment-17146721 ] Neo Wu commented on KAFKA-10134: Hi, [~guozhang] Thanks for quick update! I reviewed and tested your patch, and still found some issue {code:java} if (subscriptions.fetchablePartitions(tp -> true).isEmpty()) { updateAssignmentMetadataIfNeeded(timer); } else { final Timer updateMetadataTimer = time.timer(0L); updateAssignmentMetadataIfNeeded(updateMetadataTimer); timer.update(updateMetadataTimer.currentTimeMs()); } {code} there are 2 scenarios 1) start java consumer with kafka stopped, the code work as expected, it flows to first branch, and wait with timer, and will be able to connect to kafka when kafka is up (with low cpu usage due to timer) however in 2nd secnarios 2) start both java consumer and kafka, and let java consumer successfully subscribe some topics, then force stop kafka then subscriptions.fetchablePartitions(tp -> true) will return non empty result, then it will go into second branch without blocking and it will trigger busy cpu wait Thanks, > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10207) Untrimmed Index files cause premature log segment deletions on startup
Johnny Malizia created KAFKA-10207: -- Summary: Untrimmed Index files cause premature log segment deletions on startup Key: KAFKA-10207 URL: https://issues.apache.org/jira/browse/KAFKA-10207 Project: Kafka Issue Type: Bug Components: log Affects Versions: 2.4.1, 2.3.1, 2.4.0 Reporter: Johnny Malizia [KIP-263|https://cwiki.apache.org/confluence/display/KAFKA/KIP-263%3A+Allow+broker+to+skip+sanity+check+of+inactive+segments+on+broker+startup#KIP263:Allowbrokertoskipsanitycheckofinactivesegmentsonbrokerstartup-Evaluation] appears to have introduced a change explicitly deciding to not call the sanityCheck method on the time or offset index files that are loaded by Kafka at startup. I found a particularly nasty bug using the following configuration {code:java} jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws kafka=2.4.1{code} The bug was that the retention period for a topic or even the broker level configuration seemed to not be respected, no matter what, when the broker started up it would decide that all log segments on disk were breaching the retention window and the data would be purged away. {code:java} Found deletable segments with base offsets [11610665,12130396,12650133] due to retention time 8640ms breach {code} {code:java} Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log) Scheduling segments for deletion List(LogSegment(baseOffset=11610665, size=1073731621, lastModifiedTime=1592532125000, largestTime=0), LogSegment(baseOffset=12130396, size=1073727967, lastModifiedTime=1592532462000, largestTime=0), LogSegment(baseOffset=12650133, size=235891971, lastModifiedTime=1592532531000, largestTime=0)) {code} Further logging showed that this issue was happening when loading the files, indicating the final writes to trim the index were not successful {code:java} DEBUG Loaded index file /mnt/kafka-logs/test_topic-0/17221277.timeindex with maxEntries = 873813, maxIndexSize = 10485760, entries = 873813, lastOffset = TimestampOffset(0,17221277), file position = 10485756 (kafka.log.TimeIndex){code} So, because the index leaves the preallocated 0 bytes at the tail, when the index is loaded again after restarting Kafka, the next timestamp is 0 and this leads to a premature TTL deletion of the log segments. I tracked the issue down to being caused by the jvm version being used as upgrading resolved this issue, but I think that Kafka should never delete data by mistake like this as doing a rolling restart with this bug in place would cause complete data-loss across the cluster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10189) Reset metric EventQueueTimeMs
[ https://issues.apache.org/jira/browse/KAFKA-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10189: Labels: (was: core) > Reset metric EventQueueTimeMs > -- > > Key: KAFKA-10189 > URL: https://issues.apache.org/jira/browse/KAFKA-10189 > Project: Kafka > Issue Type: Bug > Components: controller, core, metrics >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Minor > > The metric > [EventQueueTimeMs|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L81] > does not reset and therefore misrepresents the controller event queue time > in these two scenarios: > 1. upon losing leader election - `EventQueueTimeMs` portrays the last event > queue time of the previous controller and not the current controller > 2. no controller events are added to the queue - `EventQueueTimeMs` portrays > the most recent event queue time, not the current queue time (which is 0) > For both cases, we should reset the controller event queue time to 0. > Implementation: > Instead of using `LinkedBlockingQueue.take()` > [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L118], > we can use `LinkedBlockingQueue.poll(long timeout, TimeUnit unit)` and reset > `EventQueueTimeMs` if the queue is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10189) Reset metric EventQueueTimeMs
[ https://issues.apache.org/jira/browse/KAFKA-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10189: Labels: core (was: pull-request-available) > Reset metric EventQueueTimeMs > -- > > Key: KAFKA-10189 > URL: https://issues.apache.org/jira/browse/KAFKA-10189 > Project: Kafka > Issue Type: Bug > Components: controller, metrics >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Minor > Labels: core > > The metric > [EventQueueTimeMs|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L81] > does not reset and therefore misrepresents the controller event queue time > in these two scenarios: > 1. upon losing leader election - `EventQueueTimeMs` portrays the last event > queue time of the previous controller and not the current controller > 2. no controller events are added to the queue - `EventQueueTimeMs` portrays > the most recent event queue time, not the current queue time (which is 0) > For both cases, we should reset the controller event queue time to 0. > Implementation: > Instead of using `LinkedBlockingQueue.take()` > [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L118], > we can use `LinkedBlockingQueue.poll(long timeout, TimeUnit unit)` and reset > `EventQueueTimeMs` if the queue is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10189) Reset metric EventQueueTimeMs
[ https://issues.apache.org/jira/browse/KAFKA-10189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10189: Component/s: core > Reset metric EventQueueTimeMs > -- > > Key: KAFKA-10189 > URL: https://issues.apache.org/jira/browse/KAFKA-10189 > Project: Kafka > Issue Type: Bug > Components: controller, core, metrics >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Minor > Labels: core > > The metric > [EventQueueTimeMs|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L81] > does not reset and therefore misrepresents the controller event queue time > in these two scenarios: > 1. upon losing leader election - `EventQueueTimeMs` portrays the last event > queue time of the previous controller and not the current controller > 2. no controller events are added to the queue - `EventQueueTimeMs` portrays > the most recent event queue time, not the current queue time (which is 0) > For both cases, we should reset the controller event queue time to 0. > Implementation: > Instead of using `LinkedBlockingQueue.take()` > [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerEventManager.scala#L118], > we can use `LinkedBlockingQueue.poll(long timeout, TimeUnit unit)` and reset > `EventQueueTimeMs` if the queue is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146707#comment-17146707 ] Almog Gavra commented on KAFKA-10179: - [~ableegoldman] I confirmed locally that nothing "breaks" if we use a deserializer that projects a subset of the fields in the record, as you suspected, but consider the following points: # Some of the most popular serdes are asymmetric (e.g. avro builds in the concept of reader/writer schema into their APIs) # It may be impossible to determine, for a given serde, whether it is symmetric # State after recovery should be identical to before recovery for predictable operations (especially in cloud environments) # Some of the most popular serdes have side effects (e.g. Confluent schema registry serdes will create subjects on your behalf) In practice, the first three points in conjunction with what [~mjsax] said (the source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic), means that we can't safely turn on the source-topic-changelog optimization unless the user indicates either (a) they are using a symmetrical serde or (b) they are willing to waive 3 in order to speed up recovery ([~cadonna] if we consider 3 a matter of correctness, we can't sacrifice correctness for performance without the user's consent). Even if the user indicates (a) or (b) above, I still don't think we can implement the fix described here because of the fourth point. It may be possible that the user is using a symmetric serde but their schema is not identical to the one that wrote to the kafka topic (e.g. ksql, for example, generates a new schema where all the fields are the same but the schema has a different name, I can also easily imagine a schema with _more_ fields that would write the same value as it read from an event with fewer fields). I'm not sure I understand this comment: "The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing)." Why can't we just always pass-through the data into the state store if the optimization is enabled? > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146706#comment-17146706 ] Guozhang Wang commented on KAFKA-10134: --- I've provided a PR for trunk, but it should apply cleanly to 2.5 as well (I will cherry-pick this when merging). [~neowu0] [~seanguo] please let me know if you could apply the patch and verify if it helps resolving the issue. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeffkbkim opened a new pull request #8935: reset event queue time histogram when queue is empty
jeffkbkim opened a new pull request #8935: URL: https://github.com/apache/kafka/pull/8935 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* * https://issues.apache.org/jira/browse/KAFKA-10189 * add a timeout for event queue time histogram * reset `eventQueueTimeHist` when the controller event queue is empty * unit test for resetting event queue time histogram *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions
guozhangwang commented on pull request #8934: URL: https://github.com/apache/kafka/pull/8934#issuecomment-650464132 @hachikuji @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions
guozhangwang opened a new pull request #8934: URL: https://github.com/apache/kafka/pull/8934 The intention of using poll(0) is to not block on rebalance but still return some data; however, if there's no fetchable partitions then there's no point of polling with 0ms. What's worse, with poll(0) we may fall into a busy loop since we may advance the system test with too small pace. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
guozhangwang commented on a change in pull request #8905: URL: https://github.com/apache/kafka/pull/8905#discussion_r446461812 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java ## @@ -258,34 +263,43 @@ private void logValue(final Bytes key, final BufferKey bufferKey, final BufferVa final int sizeOfBufferTime = Long.BYTES; final ByteBuffer buffer = value.serialize(sizeOfBufferTime); buffer.putLong(bufferKey.time()); - +final byte[] array = buffer.array(); ((RecordCollector.Supplier) context).recordCollector().send( -changelogTopic, -key, -buffer.array(), -V_2_CHANGELOG_HEADERS, -partition, -null, -KEY_SERIALIZER, -VALUE_SERIALIZER +changelogTopic, +key, +array, +CHANGELOG_HEADERS, +partition, +null, +KEY_SERIALIZER, +VALUE_SERIALIZER ); } private void logTombstone(final Bytes key) { ((RecordCollector.Supplier) context).recordCollector().send( -changelogTopic, -key, -null, -null, -partition, -null, -KEY_SERIALIZER, -VALUE_SERIALIZER +changelogTopic, +key, +null, +null, Review comment: SG This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on pull request #8905: URL: https://github.com/apache/kafka/pull/8905#issuecomment-650462969 Ok, @guozhangwang , This is my "final" iteration. I pulled the system tests out, and I'll follow up with another PR later. This PR should be sufficient for the basic purpose, thanks to the new "binary" compatibility unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-6453) Reconsider timestamp propagation semantics
[ https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victoria Bialas resolved KAFKA-6453. Resolution: Fixed Fixed by James Galasyn in https://github.com/apache/kafka/pull/8920 > Reconsider timestamp propagation semantics > -- > > Key: KAFKA-6453 > URL: https://issues.apache.org/jira/browse/KAFKA-6453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Victoria Bialas >Priority: Major > Labels: needs-kip > > Atm, Kafka Streams only has a defined "contract" about timestamp propagation > at the Processor API level: all processor within a sub-topology, see the > timestamp from the input topic record and this timestamp will be used for all > result record when writing them to an topic, too. > The DSL, inherits this "contract" atm. > From a DSL point of view, it would be desirable to provide a different > contract to the user. To allow this, we need to do the following: > - extend Processor API to allow manipulation timestamps (ie, a Processor can > set a new timestamp for downstream records) > - define a DSL "contract" for timestamp propagation for each DSL operator > - document the DSL "contract" > - implement the DSL "contract" using the new/extended Processor API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman closed pull request #6633: [MINOR] Better estimate size of cache
ableegoldman closed pull request #6633: URL: https://github.com/apache/kafka/pull/6633 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman closed pull request #7413: KAFKA-8649: version probing with upgraded leader
ableegoldman closed pull request #7413: URL: https://github.com/apache/kafka/pull/7413 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10177) Replace/improve Percentiles metrics
[ https://issues.apache.org/jira/browse/KAFKA-10177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146681#comment-17146681 ] Sophie Blee-Goldman commented on KAFKA-10177: - I haven't personally looked into HrdHistogram specifically, but I think the approach of porting over an existing and well-tested implementation is the right way to go. It's probably not worth an extra dependency and shouldn't be too complicated to re-implement a reasonable percentiles algorithm (famous last words, I know...) > Replace/improve Percentiles metrics > --- > > Key: KAFKA-10177 > URL: https://issues.apache.org/jira/browse/KAFKA-10177 > Project: Kafka > Issue Type: Improvement > Components: metrics >Reporter: Sophie Blee-Goldman >Priority: Major > > There's an existing – but seemingly unused – implementation of percentile > metrics that we attempted to use for end-to-end latency metrics in Streams. > Unfortunately a number of limitations became apparent, and we ultimately > pulled the metrics from the 2.6 release pending further > investigation/improvement. > The problems we encountered were > # Need to set a static upper/lower limit for the values > # Not well suited to a distribution with a long tail, ie setting the max > value too high caused the accuracy to plummet > # Required a lot of memory per metric for reasonable accuracy and caused us > to hit OOM (unclear if there was actually a memory leak, or it was just > gobbling up unnecessarily large amounts in general) > Since the Percentiles class is part of the public API, we may need to create > a new class altogether and possibly deprecate/remove the old one. > Alternatively we can consider just re-implementing the existing class from > scratch, and just deprecating the current constructors and associated > implementation (eg the constructor accepts a max) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146678#comment-17146678 ] Guozhang Wang commented on KAFKA-10134: --- [~seanguo] I think [~neowu0]'s brought up issue is a valid one to tackle, but I'm not sure if it is the same root cause you've seen: basically the consumer maybe tied in the metadata refresh loop if it cannot find the coordinator, but that is only the case if the coordinator broker is indeed not available during that time. In your observation though, there should be no broker unavailability since you're just bouncing the consumer instance and the coordinator should be quick to discover. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10166) Excessive TaskCorruptedException seen in testing
[ https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman resolved KAFKA-10166. - Resolution: Fixed > Excessive TaskCorruptedException seen in testing > > > Key: KAFKA-10166 > URL: https://issues.apache.org/jira/browse/KAFKA-10166 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 2.6.0 > > > As the title indicates, long-running test applications with injected network > "outages" seem to hit TaskCorruptedException more than expected. > Seen occasionally on the ALOS application (~20 times in two days in one case, > for example), and very frequently with EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-10134: - Assignee: Guozhang Wang > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task
guozhangwang commented on pull request #8926: URL: https://github.com/apache/kafka/pull/8926#issuecomment-650426719 Cherry-picked to 2.6 since it is a blocker, cc @rhauch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task
guozhangwang merged pull request #8926: URL: https://github.com/apache/kafka/pull/8926 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams
ableegoldman commented on a change in pull request #8929: URL: https://github.com/apache/kafka/pull/8929#discussion_r446431689 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ## @@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() { if (thisHost.equals(UNKNOWN_HOST)) { return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1); } -return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1); +return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), -1); Review comment: I looked through the StreamsMetadataState and it does seem like it could technically be null if this instance was never assigned any active or standby tasks at all, ever. That really _shouldn't_ happen, but of course it can if you massively over-provisioned your app and we shouldn't throw an NPE over that. Seems like this is actually an existing bug that we should fix. Then we can improve the initialization check on the side This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishbellapu commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration
satishbellapu commented on pull request #8921: URL: https://github.com/apache/kafka/pull/8921#issuecomment-650419736 + @cmccabe @rajinisivaram for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8925: KAFKA-9974: Make produce-sync flush
ableegoldman commented on pull request #8925: URL: https://github.com/apache/kafka/pull/8925#issuecomment-650414999 Is Jenkins linking to the wrong builds? When I try to see which tests failed here it seems to bring me to the results for two completely different PRs. Really dropping the ball lately Mr. Jenkins This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on pull request #8905: URL: https://github.com/apache/kafka/pull/8905#issuecomment-650413094 I will follow up shortly to extract the system tests to a separate PR, since we're having trouble running the tests at all right now, and we wouldn't know if they are even more broken. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on pull request #8905: URL: https://github.com/apache/kafka/pull/8905#issuecomment-650412771 Hey @guozhangwang , you might want to take a look at that last fix. The duck-typing code was producing an OOME some times, when it would just interpret a random integer out of the buffer as a "size" (integer) and blindly allocate an array of that size. I added a Util (with tests) that has a guard to prevent this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8927: KAFKA-10200: Fix testability of PAPI with windowed stores
ableegoldman commented on a change in pull request #8927: URL: https://github.com/apache/kafka/pull/8927#discussion_r446418844 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; + +public final class ProcessorContextUtils { Review comment: I'm trying to phrase this is a less discouraging way, since I agree this is a nice & clever fix: is this meant to be the actual, final solution to this problem, or just a temporary workaround to unblock us without the need for a KIP? This just "happens" to work out nicely because we know the mocks are actually returning a StreamsMetricsImpl as well. What happens if we need to add/access more complicated processor context functionality in the inner state stores? I say "inner" because the caching layers for example also perform a cast to InternalProcessorContext. I completely agree with proceeding with this, so don't take that question as a challenge to this PR. Just wondering where this leaves us going forward. Should we accept (and therefore enforce) that state stores can't have caching enabled in unit tests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on a change in pull request #8905: URL: https://github.com/apache/kafka/pull/8905#discussion_r446409619 ## File path: build.gradle ## @@ -97,7 +97,7 @@ ext { buildVersionFileName = "kafka-version.properties" defaultMaxHeapSize = "2g" - defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC"] + defaultJvmArgs = ["-Xss4m"] Review comment: Aha! I figured it out. There actually was a bug in the test. While duck-typing, the code was trying to allocate an array of 1.8GB. It's funny that disabling this flag made this test pass on java 11 and 14. Maybe the flag partitions the heap on those versions or something, so the test didn't actually have the full 2GB available. Anyway, I'm about to push a fix and put the flag back. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams
ableegoldman commented on a change in pull request #8929: URL: https://github.com/apache/kafka/pull/8929#discussion_r446408964 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ## @@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() { if (thisHost.equals(UNKNOWN_HOST)) { return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1); } -return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1); +return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), -1); Review comment: This gets initialized during the rebalance and IQ isn't available until Streams has reached RUNNING. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams
abbccdda commented on a change in pull request #8929: URL: https://github.com/apache/kafka/pull/8929#discussion_r446404393 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ## @@ -50,7 +51,7 @@ private final HostInfo thisHost; private List allMetadata = Collections.emptyList(); private Cluster clusterMetadata; -private StreamsMetadata localMetadata; +private AtomicReference localMetadata = new AtomicReference<>(null); Review comment: This could be final ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ## @@ -226,7 +227,7 @@ public StreamsMetadata getLocalMetadata() { if (thisHost.equals(UNKNOWN_HOST)) { return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1); } -return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1); +return new KeyQueryMetadata(localMetadata.get().hostInfo(), Collections.emptySet(), -1); Review comment: Hmm, why do we still keep it? Based on the reviews for previous version, I believe that there is some strict ordering for getting `localMetadata` initialized to be non-null on L352 first before hitting this logic, but still a null check sound more resilient to me, unless we want to have a NullPointerException to be thrown explicitly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan commented on pull request #8930: Mirror Maker 2: offset-syncs variable
ryannedolan commented on pull request #8930: URL: https://github.com/apache/kafka/pull/8930#issuecomment-650389551 @cgetzen the larger change would be worth it IMO. I think a KIP is the right way forward. Happy to help, but you should be able to get permission to create a KIP yourself. Might just take a few tries, unfortunately. Maybe check who has been responding to such requests lately and reach out to them directly. Are there other properties that we could make more customizable in this way too? Might be a good time to address them all together. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on pull request #8905: URL: https://github.com/apache/kafka/pull/8905#issuecomment-650387794 Ah, that heap space thing was legit. Fix coming... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task
abbccdda commented on pull request #8926: URL: https://github.com/apache/kafka/pull/8926#issuecomment-650385489 ```12:23:50 FAILURE: Build failed with an exception. 12:23:50 12:23:50 * What went wrong: 12:23:50 Execution failed for task ':streams:unitTest'. 12:23:50 > Process 'Gradle Test Executor 9' finished with non-zero exit value 134 12:23:50 This problem might be caused by incorrect test process configuration. 12:23:50 Please refer to the test execution section in the User Manual at https://docs.gradle.org/6.5/userguide/java_testing.html#sec:test_execution 12:23:50 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams
lct45 commented on pull request #8929: URL: https://github.com/apache/kafka/pull/8929#issuecomment-650384491 Failed connect tests on JDC 14 org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStart org.apache.kafka.connect.integration.BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStart org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorInitialize This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task
ableegoldman commented on pull request #8926: URL: https://github.com/apache/kafka/pull/8926#issuecomment-650382721 Java 8 and 14 builds passed, Java 11 build failed with...zero failures? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cgetzen edited a comment on pull request #8930: Mirror Maker 2: offset-syncs variable
cgetzen edited a comment on pull request #8930: URL: https://github.com/apache/kafka/pull/8930#issuecomment-650381294 Hi @ryannedolan, wondering if you could spare a little guidance. I created `offset-syncs.topic.override` variable with a default of `null`. What I would have loved to do is to create something like `offset-syncs.topic` with a default value of `mm2-offsets-sync..internal`, but was unable due to the alias in the string, and think it would require a much larger change to do so. Thoughts? I also emailed d...@kafka.apache.org to get access to create a KIP, but that seems to be a dead end. If I get together the content for a KIP, would you be willing to submit it? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146612#comment-17146612 ] Leah Thomas commented on KAFKA-9509: Failed again: h3. Stacktrace java.lang.RuntimeException: Could not find enough records. found 0, expected 100 at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435) at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:221) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1292/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/] > Fix flaky test MirrorConnectorsIntegrationTest.testReplication > -- > > Key: KAFKA-9509 > URL: https://issues.apache.org/jira/browse/KAFKA-9509 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Sanjana Kaundinya >Assignee: Luke Chen >Priority: Major > Fix For: 2.5.0 > > > The test > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication > is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of > when the connectors and tasks are started up. The fix for this would make it > such that when the connectors are started up, to wait until the REST endpoint > returns a positive number of tasks to be confident that we can start testing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cgetzen commented on pull request #8930: Mirror Maker 2: offset-syncs variable
cgetzen commented on pull request #8930: URL: https://github.com/apache/kafka/pull/8930#issuecomment-650381294 Hi @ryannedolan, wondering if you could spare a little guidance. I created `offset-syncs.topic.override` variable with a default of `null`. What I would have loved to do is to create something like `offset-syncs.topic` with a default value of `mm2-offsets-sync..internal`, but was unable to due to the alias in the string, and think it would require a much larger change to do so. Thoughts? I also emailed d...@kafka.apache.org to get access to create a KIP, but that seems to be a dead end. If I get together the content for a KIP, would you be willing to submit it? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop
gharris1727 commented on a change in pull request #8910: URL: https://github.com/apache/kafka/pull/8910#discussion_r446379953 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ## @@ -689,6 +692,10 @@ else if (!context.pausedPartitions().isEmpty()) @Override public void onPartitionsRevoked(Collection partitions) { +if (taskStopped) { Review comment: I think this gets called by the consumer thread, which is different from the thread which calls `close()`. I think that it may be necessary to mark this variable as volatile. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10181) AlterConfig/IncrementalAlterConfig should go to controller
[ https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10181: Fix Version/s: 2.7.0 > AlterConfig/IncrementalAlterConfig should go to controller > -- > > Key: KAFKA-10181 > URL: https://issues.apache.org/jira/browse/KAFKA-10181 > Project: Kafka > Issue Type: Sub-task > Components: admin >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.7.0 > > > In the new Admin client, the request should always be routed towards the > controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10181) AlterConfig/IncrementalAlterConfig should go to controller
[ https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10181: Component/s: admin > AlterConfig/IncrementalAlterConfig should go to controller > -- > > Key: KAFKA-10181 > URL: https://issues.apache.org/jira/browse/KAFKA-10181 > Project: Kafka > Issue Type: Sub-task > Components: admin >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the new Admin client, the request should always be routed towards the > controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll
[ https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146596#comment-17146596 ] João Oliveirinha commented on KAFKA-9693: - [~junrao] thanks for the analysis and hypothesis. I didn't want to trigger the flush during the log append. In this case, I just configured log.flush.interval.ms=1000 and log.flush.scheduler.interval.ms=1000. So, hopefully I got the same result as of using the sysctl commands. But even when I used the default settings in my first tests, I could see the spikes in latency. I am using 64 partitions and a replication of 2 for the topic. Each broker has 3 replica fetchers, so I assume that each fetch follower request to include at most (assuming that they are on the same broker) ~3 partitions. Regarding the throttling on the replication: 1) how can i disable it. 2) I rarely saw those happening during the entire test but I also feel inclined to belive that it is something related to that, but that doesn't explain the 200ms since we are seeing throthling of 4ms unless they somehow add up to 200ms almost every time. > Kafka latency spikes caused by log segment flush on roll > > > Key: KAFKA-9693 > URL: https://issues.apache.org/jira/browse/KAFKA-9693 > Project: Kafka > Issue Type: Improvement > Components: core > Environment: OS: Amazon Linux 2 > Kafka version: 2.2.1 >Reporter: Paolo Moriello >Assignee: Paolo Moriello >Priority: Major > Labels: Performance, latency, performance > Attachments: image-2020-03-10-13-17-34-618.png, > image-2020-03-10-14-36-21-807.png, image-2020-03-10-15-00-23-020.png, > image-2020-03-10-15-00-54-204.png, image-2020-06-23-12-24-46-548.png, > image-2020-06-23-12-24-58-788.png, image-2020-06-26-13-43-21-723.png, > image-2020-06-26-13-46-52-861.png, image-2020-06-26-14-06-01-505.png, > latency_plot2.png > > > h1. Summary > When a log segment fills up, Kafka rolls over onto a new active segment and > force the flush of the old segment to disk. When this happens, log segment > _append_ duration increase causing important latency spikes on producer(s) > and replica(s). This ticket aims to highlight the problem and propose a > simple mitigation: add a new configuration to enable/disable rolled segment > flush. > h1. 1. Phenomenon > Response time of produce request (99th ~ 99.9th %ile) repeatedly spikes to > ~50x-200x more than usual. For instance, normally 99th %ile is lower than > 5ms, but when this issue occurs, it marks 100ms to 200ms. 99.9th and 99.99th > %iles even jump to 500-700ms. > Latency spikes happen at constant frequency (depending on the input > throughput), for small amounts of time. All the producers experience a > latency increase at the same time. > h1. !image-2020-03-10-13-17-34-618.png|width=942,height=314! > {{Example of response time plot observed during on a single producer.}} > URPs rarely appear in correspondence of the latency spikes too. This is > harder to reproduce, but from time to time it is possible to see a few > partitions going out of sync in correspondence of a spike. > h1. 2. Experiment > h2. 2.1 Setup > Kafka cluster hosted on AWS EC2 instances. > h4. Cluster > * 15 Kafka brokers: (EC2 m5.4xlarge) > ** Disk: 1100Gb EBS volumes (4750Mbps) > ** Network: 10 Gbps > ** CPU: 16 Intel Xeon Platinum 8000 > ** Memory: 64Gb > * 3 Zookeeper nodes: m5.large > * 6 producers on 6 EC2 instances in the same region > * 1 topic, 90 partitions - replication factor=3 > h4. Broker config > Relevant configurations: > {quote}num.io.threads=8 > num.replica.fetchers=2 > offsets.topic.replication.factor=3 > num.network.threads=5 > num.recovery.threads.per.data.dir=2 > min.insync.replicas=2 > num.partitions=1 > {quote} > h4. Perf Test > * Throughput ~6000-8000 (~40-70Mb/s input + replication = ~120-210Mb/s per > broker) > * record size = 2 > * Acks = 1, linger.ms = 1, compression.type = none > * Test duration: ~20/30min > h2. 2.2 Analysis > Our analysis showed an high +correlation between log segment flush count/rate > and the latency spikes+. This indicates that the spikes in max latency are > related to Kafka behavior on rolling over new segments. > The other metrics did not show any relevant impact on any hardware component > of the cluster, eg. cpu, memory, network traffic, disk throughput... > > !latency_plot2.png|width=924,height=308! > {{Correlation between latency spikes and log segment flush count. p50, p95, > p99, p999 and p latencies (left axis, ns) and the flush #count (right > axis, stepping blue line in plot).}} > Kafka schedules logs flushing (this includes flushing the file record > containing log entries, the offset index, the timestamp index and the > transaction index)
[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on pull request #8905: URL: https://github.com/apache/kafka/pull/8905#issuecomment-650355203 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on pull request #8905: URL: https://github.com/apache/kafka/pull/8905#issuecomment-650354756 Hmm. Still saw the OOME in https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3134/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
guozhangwang commented on pull request #8905: URL: https://github.com/apache/kafka/pull/8905#issuecomment-650354464 test this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on a change in pull request #8905: URL: https://github.com/apache/kafka/pull/8905#discussion_r446365992 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java ## @@ -258,34 +263,43 @@ private void logValue(final Bytes key, final BufferKey bufferKey, final BufferVa final int sizeOfBufferTime = Long.BYTES; final ByteBuffer buffer = value.serialize(sizeOfBufferTime); buffer.putLong(bufferKey.time()); - +final byte[] array = buffer.array(); ((RecordCollector.Supplier) context).recordCollector().send( -changelogTopic, -key, -buffer.array(), -V_2_CHANGELOG_HEADERS, -partition, -null, -KEY_SERIALIZER, -VALUE_SERIALIZER +changelogTopic, +key, +array, +CHANGELOG_HEADERS, +partition, +null, +KEY_SERIALIZER, +VALUE_SERIALIZER ); } private void logTombstone(final Bytes key) { ((RecordCollector.Supplier) context).recordCollector().send( -changelogTopic, -key, -null, -null, -partition, -null, -KEY_SERIALIZER, -VALUE_SERIALIZER +changelogTopic, +key, +null, +null, Review comment: I remember considering this when I added the first version header. The reason I didn't is that, since the initial version didn't have any headers, even if we change the tombstone format in the future, we'll always have to interpret a "no header, null value" record as being a "legacy format" tombstone, just like we have to interpret a "no header, non-null value" as being a "legacy format" data record. You can think of "no header" as indicating "version 0". Since we haven't changed the format of tombstones _yet_, there's no value in adding a "version 1" flag. We should just wait until we do need to make such a change (if ever). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on a change in pull request #8905: URL: https://github.com/apache/kafka/pull/8905#discussion_r446364196 ## File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; +import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually; + +public class StreamsSmokeTest { Review comment: That's correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on a change in pull request #8905: URL: https://github.com/apache/kafka/pull/8905#discussion_r446364364 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; + +import java.nio.ByteBuffer; + +import static java.util.Objects.requireNonNull; + +final class TimeOrderedKeyValueBufferChangelogDeserializationHelper { +private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {} + +static final class DeserializationResult { +private final long time; +private final Bytes key; +private final BufferValue bufferValue; + +private DeserializationResult(final long time, final Bytes key, final BufferValue bufferValue) { +this.time = time; +this.key = key; +this.bufferValue = bufferValue; +} + +long time() { +return time; +} + +Bytes key() { +return key; +} + +BufferValue bufferValue() { +return bufferValue; +} +} + + +static DeserializationResult duckTypeV2(final ConsumerRecord record, final Bytes key) { +DeserializationResult deserializationResult = null; +RuntimeException v2DeserializationException = null; +RuntimeException v3DeserializationException = null; +try { +deserializationResult = deserializeV2(record, key); +} catch (final RuntimeException e) { +v2DeserializationException = e; +} +// versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the +// V2 header, so we'll try duck-typing to see if this is decodable as V3 +if (deserializationResult == null) { +try { +deserializationResult = deserializeV3(record, key); +} catch (final RuntimeException e) { +v3DeserializationException = e; +} +} + +if (deserializationResult == null) { +// ok, it wasn't V3 either. Throw both exceptions: +final RuntimeException exception = +new RuntimeException("Couldn't deserialize record as v2 or v3: " + record, + v2DeserializationException); +exception.addSuppressed(v3DeserializationException); +throw exception; +} +return deserializationResult; +} + +private static DeserializationResult deserializeV2(final ConsumerRecord record, Review comment: sure thing! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on a change in pull request #8905: URL: https://github.com/apache/kafka/pull/8905#discussion_r446363798 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java ## @@ -361,26 +366,20 @@ private void restoreBatch(final Collection> batch contextualRecord.recordContext() ) ); -} else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) { -// in this case, the changelog value is a serialized BufferValue +} else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) { + +final DeserializationResult deserializationResult = duckTypeV2(record, key); Review comment: Sorry, the comments in `duckTypeV2`. Basically, because we released three versions that would write data in the "v3" format, but with the "v2" flag, when we see the v2 flag, the data might be in v2 format or v3 format. The only way to tell is to just try to deserialize it in v2 format, and if we get an exception, then to try with v3 format. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
guozhangwang commented on a change in pull request #8905: URL: https://github.com/apache/kafka/pull/8905#discussion_r446343541 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -189,8 +192,8 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version): processor.stop() processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) -@matrix(from_version=metadata_2_versions, to_version=metadata_2_versions) -def test_simple_upgrade_downgrade(self, from_version, to_version): +@matrix(from_version=smoke_test_versions, to_version=dev_version) Review comment: +1, I think this is a great find. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java ## @@ -361,26 +366,20 @@ private void restoreBatch(final Collection> batch contextualRecord.recordContext() ) ); -} else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) { -// in this case, the changelog value is a serialized BufferValue +} else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) { + +final DeserializationResult deserializationResult = duckTypeV2(record, key); Review comment: Could you clarify which comment are you referring to? I did not see any comments for the "restoreBatch" method.. ## File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; +import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually; + +public class StreamsSmokeTest { Review comment: I'm assuming 22..25 client / drive code are all copy-pastes here so I skipped reviewing them. LMK if they aren't. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; + +import java.nio.ByteBuffer; + +import static java.util.Objects.requireNonNull; + +final class TimeOrderedKeyValueBufferChangelogDeserializationHelper { +private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {} + +static final class DeserializationResult { +private final long time; +private final Bytes key; +private final BufferValue bufferValue; + +private DeserializationResult(final long time, final Bytes key, final BufferValue bufferValue) { +this.time =
[GitHub] [kafka] ijuma commented on pull request #8579: KAFKA-9930: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.
ijuma commented on pull request #8579: URL: https://github.com/apache/kafka/pull/8579#issuecomment-650319283 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest - call KafkaStreams#cleanU…
ableegoldman commented on pull request #8913: URL: https://github.com/apache/kafka/pull/8913#issuecomment-650316087 The system tests I'm triggering keep failing, but not because of this PR. Not sure what's going on :/ But I'll try again: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3990/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] efeg commented on a change in pull request #8579: KAFKA-9930: Prevent ReplicaFetcherThread from throwing UnknownTopicOrPartitionException upon topic creation and deletion.
efeg commented on a change in pull request #8579: URL: https://github.com/apache/kafka/pull/8579#discussion_r446327952 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -382,6 +382,12 @@ abstract class AbstractFetcherThread(name: String, "that the partition is being moved") partitionsWithError += topicPartition +case Errors.UNKNOWN_TOPIC_OR_PARTITION => + warn(s"Receiving ${Errors.UNKNOWN_TOPIC_OR_PARTITION} from the leader for partition $topicPartition. " + + s"This can happen transiently if the partition is being created or deleted. " + + s"However, this is unexpected if it sustains.") Review comment: @ijuma thanks for the update -- let me know if further changes needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy closed pull request #8932: MINOR: rename "NOT_INITALIZED" to "NOT_INITIALIZED"
omkreddy closed pull request #8932: URL: https://github.com/apache/kafka/pull/8932 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andrewchoi5 commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
andrewchoi5 commented on a change in pull request #8479: URL: https://github.com/apache/kafka/pull/8479#discussion_r446318540 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -499,7 +500,16 @@ class Partition(val topicPartition: TopicPartition, addingReplicas = addingReplicas, removingReplicas = removingReplicas ) - createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + try { +this.createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + } catch { +case e: ZooKeeperClientException => + stateChangeLogger.info(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " + +s"state change for the partition with leader epoch: $leaderEpoch ", e) + error(s"ZooKeeper client error occurred while this partition was becoming the leader for $topicPartition.", e) Review comment: Makes sense. Would appreciate a review on the change -- @junrao . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
ableegoldman commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r446316588 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java ## @@ -124,18 +141,19 @@ public void before() { Serdes.String() ); metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); -expect(context.metrics()) -.andReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)).anyTimes(); -expect(context.taskId()).andReturn(taskId).anyTimes(); -expect(inner.name()).andReturn("metered").anyTimes(); +expect(context.applicationId()).andStubReturn(APPLICATION_ID); +expect(context.metrics()).andStubReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)); +expect(context.taskId()).andStubReturn(taskId); + expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC); Review comment: I think Bruno needs to give us all a short lesson on the correct usage of EasyMock. He explained the `StubReturn` thing to me on another PR a while back and I've been (slowly) trying to help migrate all the `.andReturn.anyTimes` usages over to this where appropriate (which is most places). It's definitely helped reduce the large number of EasyMock'ed tests that have to be fixed after every minor implementation change. That said, there does seem to be one critical difference between using `.andStubReturn` and the actual `MockInternalProcessorContext`. If we add a new method to the `InternalProcessorContext` interface, for example, we then have to add this expectation to every test that calls it with an EasyMock context. Having had to do this a number of times, it's definitely a huge timesuck. But I also agree that it doesn't need to be done as part of this PR. Maybe once the MockInternal and InternalMock processor contexts are finally merged we can do some reasonable cleanup of the tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
junrao commented on a change in pull request #8479: URL: https://github.com/apache/kafka/pull/8479#discussion_r446316668 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -499,7 +500,16 @@ class Partition(val topicPartition: TopicPartition, addingReplicas = addingReplicas, removingReplicas = removingReplicas ) - createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + try { +this.createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + } catch { +case e: ZooKeeperClientException => + stateChangeLogger.info(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " + +s"state change for the partition with leader epoch: $leaderEpoch ", e) + error(s"ZooKeeper client error occurred while this partition was becoming the leader for $topicPartition.", e) Review comment: I meant removing the line in 509 of error(s"ZooKeeper client . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9693) Kafka latency spikes caused by log segment flush on roll
[ https://issues.apache.org/jira/browse/KAFKA-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146527#comment-17146527 ] Jun Rao commented on KAFKA-9693: [~joliveirinha]: Thanks for the analysis. To trigger the flush during log append, did you set log.flush.interval.messages to 1? The high remote time in produce could be caused by (1) the processing for fetch follower request on the leader is long or (2) the follower is not issuing fetch request quick enough. Since fetchFollower time is not high, it doesn't seem (1) is the cause. (2) can be caused by follower log append delay. How many partitions did you use in the test? A fetch follower request can include multiple partitions. So, it can add up. (2) can also be caused by replication throttling. The non-zero follower throttling time typically indicates that replication throttling is engaged. > Kafka latency spikes caused by log segment flush on roll > > > Key: KAFKA-9693 > URL: https://issues.apache.org/jira/browse/KAFKA-9693 > Project: Kafka > Issue Type: Improvement > Components: core > Environment: OS: Amazon Linux 2 > Kafka version: 2.2.1 >Reporter: Paolo Moriello >Assignee: Paolo Moriello >Priority: Major > Labels: Performance, latency, performance > Attachments: image-2020-03-10-13-17-34-618.png, > image-2020-03-10-14-36-21-807.png, image-2020-03-10-15-00-23-020.png, > image-2020-03-10-15-00-54-204.png, image-2020-06-23-12-24-46-548.png, > image-2020-06-23-12-24-58-788.png, image-2020-06-26-13-43-21-723.png, > image-2020-06-26-13-46-52-861.png, image-2020-06-26-14-06-01-505.png, > latency_plot2.png > > > h1. Summary > When a log segment fills up, Kafka rolls over onto a new active segment and > force the flush of the old segment to disk. When this happens, log segment > _append_ duration increase causing important latency spikes on producer(s) > and replica(s). This ticket aims to highlight the problem and propose a > simple mitigation: add a new configuration to enable/disable rolled segment > flush. > h1. 1. Phenomenon > Response time of produce request (99th ~ 99.9th %ile) repeatedly spikes to > ~50x-200x more than usual. For instance, normally 99th %ile is lower than > 5ms, but when this issue occurs, it marks 100ms to 200ms. 99.9th and 99.99th > %iles even jump to 500-700ms. > Latency spikes happen at constant frequency (depending on the input > throughput), for small amounts of time. All the producers experience a > latency increase at the same time. > h1. !image-2020-03-10-13-17-34-618.png|width=942,height=314! > {{Example of response time plot observed during on a single producer.}} > URPs rarely appear in correspondence of the latency spikes too. This is > harder to reproduce, but from time to time it is possible to see a few > partitions going out of sync in correspondence of a spike. > h1. 2. Experiment > h2. 2.1 Setup > Kafka cluster hosted on AWS EC2 instances. > h4. Cluster > * 15 Kafka brokers: (EC2 m5.4xlarge) > ** Disk: 1100Gb EBS volumes (4750Mbps) > ** Network: 10 Gbps > ** CPU: 16 Intel Xeon Platinum 8000 > ** Memory: 64Gb > * 3 Zookeeper nodes: m5.large > * 6 producers on 6 EC2 instances in the same region > * 1 topic, 90 partitions - replication factor=3 > h4. Broker config > Relevant configurations: > {quote}num.io.threads=8 > num.replica.fetchers=2 > offsets.topic.replication.factor=3 > num.network.threads=5 > num.recovery.threads.per.data.dir=2 > min.insync.replicas=2 > num.partitions=1 > {quote} > h4. Perf Test > * Throughput ~6000-8000 (~40-70Mb/s input + replication = ~120-210Mb/s per > broker) > * record size = 2 > * Acks = 1, linger.ms = 1, compression.type = none > * Test duration: ~20/30min > h2. 2.2 Analysis > Our analysis showed an high +correlation between log segment flush count/rate > and the latency spikes+. This indicates that the spikes in max latency are > related to Kafka behavior on rolling over new segments. > The other metrics did not show any relevant impact on any hardware component > of the cluster, eg. cpu, memory, network traffic, disk throughput... > > !latency_plot2.png|width=924,height=308! > {{Correlation between latency spikes and log segment flush count. p50, p95, > p99, p999 and p latencies (left axis, ns) and the flush #count (right > axis, stepping blue line in plot).}} > Kafka schedules logs flushing (this includes flushing the file record > containing log entries, the offset index, the timestamp index and the > transaction index) during _roll_ operations. A log is rolled over onto a new > empty log when: > * the log segment is full > * the maxtime has elapsed since the timestamp of first message in the > segment (or, in absence of it, since the create time) > * the index
[GitHub] [kafka] ijuma commented on pull request #8931: MINOR: Update Scala to 2.13.3
ijuma commented on pull request #8931: URL: https://github.com/apache/kafka/pull/8931#issuecomment-650297987 One job passed, two had a single unrelated test failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #8931: MINOR: Update Scala to 2.13.3
ijuma merged pull request #8931: URL: https://github.com/apache/kafka/pull/8931 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10166) Excessive TaskCorruptedException seen in testing
[ https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146501#comment-17146501 ] Sophie Blee-Goldman commented on KAFKA-10166: - Thanks for the detailed analysis Bruno. In that case we are just waiting on the one PR, which should be mergeable sometime today > Excessive TaskCorruptedException seen in testing > > > Key: KAFKA-10166 > URL: https://issues.apache.org/jira/browse/KAFKA-10166 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 2.6.0 > > > As the title indicates, long-running test applications with injected network > "outages" seem to hit TaskCorruptedException more than expected. > Seen occasionally on the ALOS application (~20 times in two days in one case, > for example), and very frequently with EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] andrewchoi5 commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
andrewchoi5 commented on a change in pull request #8479: URL: https://github.com/apache/kafka/pull/8479#discussion_r446309532 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -499,7 +500,16 @@ class Partition(val topicPartition: TopicPartition, addingReplicas = addingReplicas, removingReplicas = removingReplicas ) - createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + try { +this.createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + } catch { +case e: ZooKeeperClientException => + stateChangeLogger.info(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " + +s"state change for the partition with leader epoch: $leaderEpoch ", e) + error(s"ZooKeeper client error occurred while this partition was becoming the leader for $topicPartition.", e) Review comment: Thanks @junrao -- I will make the logging level to error. Could you clarify what you mean by keep the state change logging? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams
ableegoldman commented on a change in pull request #8929: URL: https://github.com/apache/kafka/pull/8929#discussion_r446308395 ## File path: gradle/spotbugs-exclude.xml ## @@ -348,14 +348,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - - - + - - - - + + Review comment: Fair enough. I eagerly await your alternative proposal :P This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
junrao commented on a change in pull request #8479: URL: https://github.com/apache/kafka/pull/8479#discussion_r446306280 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -499,7 +500,16 @@ class Partition(val topicPartition: TopicPartition, addingReplicas = addingReplicas, removingReplicas = removingReplicas ) - createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + try { +this.createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + } catch { +case e: ZooKeeperClientException => + stateChangeLogger.info(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " + +s"state change for the partition with leader epoch: $leaderEpoch ", e) + error(s"ZooKeeper client error occurred while this partition was becoming the leader for $topicPartition.", e) Review comment: We probably can just keep the state change logging. Also, the logging level probably should be error instead of info. Ditto below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task
vvcephei commented on pull request #8926: URL: https://github.com/apache/kafka/pull/8926#issuecomment-650288339 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task
vvcephei commented on pull request #8926: URL: https://github.com/apache/kafka/pull/8926#issuecomment-650288465 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
abbccdda commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-650275504 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8914: MINOR: Do not swallow exception when collecting PIDs
abbccdda commented on pull request #8914: URL: https://github.com/apache/kafka/pull/8914#issuecomment-650275130 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams
vvcephei commented on pull request #8929: URL: https://github.com/apache/kafka/pull/8929#issuecomment-650270782 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8929: KAFKA-4996: Fix findbugs multithreaded correctness warnings for streams
vvcephei commented on pull request #8929: URL: https://github.com/apache/kafka/pull/8929#issuecomment-650271007 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8905: KAFKA-10173: Fix suppress changelog binary schema compatibility
vvcephei commented on a change in pull request #8905: URL: https://github.com/apache/kafka/pull/8905#discussion_r446265559 ## File path: build.gradle ## @@ -97,7 +97,7 @@ ext { buildVersionFileName = "kafka-version.properties" defaultMaxHeapSize = "2g" - defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC"] + defaultJvmArgs = ["-Xss4m"] Review comment: @ijuma , you'll probably want to know about this. I have no idea why, but one of the new tests in this PR was failing with: ``` java.lang.OutOfMemoryError: Java heap space at org.apache.kafka.streams.kstream.internals.FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(FullChangeSerde.java:82) at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:90) at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:61) at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:369) at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$284/0x0001002cb440.restoreBatch(Unknown Source) at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest.shouldRestoreV3FormatWithV2Header(TimeOrderedKeyValueBufferTest.java:742) ``` I captured a flight recording and a heap dump on exit, but everything looked fine, and the heap was only a few megs at the time of the crash. I noticed first that if I just overrode all the jvm args, the test would pass, and through trial and error, I identified this one as the "cause". I get an OOMe every time with `-XX:+UseParallelGC` and I've never gotten it without the flag. WDYT about dropping it? ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java ## @@ -56,14 +55,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.CHANGELOG_HEADERS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.Assert.fail; @RunWith(Parameterized.class) public class TimeOrderedKeyValueBufferTest> { -private static final RecordHeaders V_2_CHANGELOG_HEADERS = -new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})}); Review comment: imported the headers from the production code, so that it'll stay current. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java ## @@ -104,10 +104,6 @@ public void shouldWorkWithRebalance() throws InterruptedException { clients.add(smokeTestClient); smokeTestClient.start(props); -while (!clients.get(clients.size() - 1).started()) { -Thread.sleep(100); -} - Review comment: Don't need this anymore because `start` blocks until it's "started" now. ## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; -private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; -private boolean started; -private boolean closed; +private volatile boolean closed; -public SmokeTestClient(final String name) { -super(); -this.name = name; +private static void addShutdownHook(final String name, final Runnable runnable) { +if (name != null) { +Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); +} else { +Runtime.getRuntime().addShutdownHook(new Thread(runnable)); +} } -public boolean started() { -return started; +private static File tempDirectory() { +final String prefix = "kafka-"; +final File file; +try { +file =
[jira] [Commented] (KAFKA-10206) Admin can transiently return incorrect results about topics
[ https://issues.apache.org/jira/browse/KAFKA-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17146444#comment-17146444 ] Tom Bentley commented on KAFKA-10206: - I think the broker needs some alternative to replying with invalid data until it has received topic data from the controller. There are no meaningful retriable errors which the broker could return (and older clients would not expect these). The broker could delay responding to a metadata request until it had received topic data from the controller. That's complicated by the fact that the initial UPDATE_METADATA request from the controller lacks topic data. While using a counter would work most of the time, it is not safe if the controller didn't sent the 2nd UPDATE_METADATA request (e.g. due to controller failover). An alternative to using a counter would be to distinguish in the UPDATE_METADATA request between an empty topic list and a null topic list. Then there's the question of how long the broker should wait before responding. The alternative to waiting would be to return some new retriable error code to the client (which could then try another broker). [~ijuma], [~cmccabe] do you have any better ideas about how best to address this? > Admin can transiently return incorrect results about topics > --- > > Key: KAFKA-10206 > URL: https://issues.apache.org/jira/browse/KAFKA-10206 > Project: Kafka > Issue Type: Bug > Components: admin, core >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Major > > When a broker starts up it can handle metadata requests before it has > received UPDATE_METADATA requests from the controller. > This manifests in the admin client via: > * listTopics returning an empty list > * describeTopics and describeConfigs of topics erroneously returning > TopicOrPartitionNotFoundException > I assume this also affects the producer and consumer, though since > `UnknownTopicOrPartitionException` is retriable those clients recover. > Testing locally suggests that the window for this happening is typically <1s. > There doesn't seem to be any way for the caller of the Admin client to detect > this situation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10206) Admin can transiently return incorrect results about topics
Tom Bentley created KAFKA-10206: --- Summary: Admin can transiently return incorrect results about topics Key: KAFKA-10206 URL: https://issues.apache.org/jira/browse/KAFKA-10206 Project: Kafka Issue Type: Bug Components: admin, core Reporter: Tom Bentley Assignee: Tom Bentley When a broker starts up it can handle metadata requests before it has received UPDATE_METADATA requests from the controller. This manifests in the admin client via: * listTopics returning an empty list * describeTopics and describeConfigs of topics erroneously returning TopicOrPartitionNotFoundException I assume this also affects the producer and consumer, though since `UnknownTopicOrPartitionException` is retriable those clients recover. Testing locally suggests that the window for this happening is typically <1s. There doesn't seem to be any way for the caller of the Admin client to detect this situation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task
guozhangwang commented on pull request #8926: URL: https://github.com/apache/kafka/pull/8926#issuecomment-650261830 > Seems like all the Topology testDriver tests failed, but I got a green build running locally. Do they not run with `./gradlew streams:test`? They are included in streams:test. Maybe try to rebase the branch and see if there's any missing committs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8926: KAFKA-10166: always write checkpoint before closing an (initialized) task
ableegoldman commented on pull request #8926: URL: https://github.com/apache/kafka/pull/8926#issuecomment-650251400 Seems like all the Topology testDriver tests failed, but I got a green build running locally. Do they not run with `./gradlew streams:test`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8927: KAFKA-10200: Fix testability of PAPI with windowed stores
vvcephei commented on a change in pull request #8927: URL: https://github.com/apache/kafka/pull/8927#discussion_r446263222 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; + +public final class ProcessorContextUtils { Review comment: Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org