[jira] [Resolved] (KAFKA-9538) Flaky Test `testResetOffsetsExportImportPlan`
[ https://issues.apache.org/jira/browse/KAFKA-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-9538. Resolution: Fixed > Flaky Test `testResetOffsetsExportImportPlan` > - > > Key: KAFKA-9538 > URL: https://issues.apache.org/jira/browse/KAFKA-9538 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: huxihx >Priority: Major > > {code} > 19:44:41 > 19:44:41 kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsExportImportPlan FAILED > 19:44:41 java.lang.AssertionError: expected: 2, bar2-1 -> > 2)> but was: > 19:44:41 at org.junit.Assert.fail(Assert.java:89) > 19:44:41 at org.junit.Assert.failNotEquals(Assert.java:835) > 19:44:41 at org.junit.Assert.assertEquals(Assert.java:120) > 19:44:41 at org.junit.Assert.assertEquals(Assert.java:146) > 19:44:41 at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan(ResetConsumerGroupOffsetTest.scala:429) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9538) Flaky Test `testResetOffsetsExportImportPlan`
[ https://issues.apache.org/jira/browse/KAFKA-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035959#comment-17035959 ] ASF GitHub Bot commented on KAFKA-9538: --- hachikuji commented on pull request #6561: KAFKA-9538: Flaky test: testResetOffsetsExportImportPlan URL: https://github.com/apache/kafka/pull/6561 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky Test `testResetOffsetsExportImportPlan` > - > > Key: KAFKA-9538 > URL: https://issues.apache.org/jira/browse/KAFKA-9538 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: huxihx >Priority: Major > > {code} > 19:44:41 > 19:44:41 kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsExportImportPlan FAILED > 19:44:41 java.lang.AssertionError: expected: 2, bar2-1 -> > 2)> but was: > 19:44:41 at org.junit.Assert.fail(Assert.java:89) > 19:44:41 at org.junit.Assert.failNotEquals(Assert.java:835) > 19:44:41 at org.junit.Assert.assertEquals(Assert.java:120) > 19:44:41 at org.junit.Assert.assertEquals(Assert.java:146) > 19:44:41 at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan(ResetConsumerGroupOffsetTest.scala:429) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean
[ https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035930#comment-17035930 ] Ismael Juma commented on KAFKA-9504: I believe this was fixed in the 2.4 branch (and later). > Memory leak in KafkaMetrics registered to MBean > --- > > Key: KAFKA-9504 > URL: https://issues.apache.org/jira/browse/KAFKA-9504 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: Andreas Holmén >Priority: Major > > After close() called on a KafkaConsumer some registered MBeans are not > unregistered causing leak. > > > {code:java} > import static > org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; > import java.lang.management.ManagementFactory; > import java.util.HashMap; > import java.util.Map; > import javax.management.MBeanServer; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.serialization.ByteArrayDeserializer; > public class Leaker { > private static String bootstrapServers = "hostname:9092"; > > public static void main(String[] args) throws InterruptedException { > MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); > Map props = new HashMap<>(); > props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); > > int beans = mBeanServer.getMBeanCount(); > for (int i = 0; i < 100; i++) { >KafkaConsumer consumer = new KafkaConsumer<>(props, new > ByteArrayDeserializer(), new ByteArrayDeserializer()); >consumer.close(); > } > int newBeans = mBeanServer.getMBeanCount(); > System.out.println("\nbeans delta: " + (newBeans - beans)); > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean
[ https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035929#comment-17035929 ] Junyong Li edited comment on KAFKA-9504 at 2/13/20 5:31 AM: Maybe I found the cause: KafkaConsumerMetrics and SelectorMetrics use the same metricGrpName, which is "consumer-metrics", and SelectorMetrics will call metrics.removeMetric in its close (). Because KafkaConsumerMetrics does not delete the metric items added during initialization, metrics.removeMetric will eventually cause the execution of JmxReporter. java: 101, re-register the deleted MBean named "consumer-metrics". Maybe we can modify the metricGrpName of SelectorMetrics to "consumer-selector-metrics", or adjust the closing order of components in close () in KafkaConsumer, and put Utils.closeQuietly (metrics ... behind AppInfoParser.unregisterAppInfo (... was (Author: lijy83): Maybe I found the cause: KafkaConsumerMetrics and SelectorMetrics use the same metricGrpName, which is "consumer-metrics", and SelectorMetrics will call metrics.removeMetric in its close (). Because KafkaConsumerMetrics does not delete the metric items added during initialization, SelectorMetrics's close() will eventually cause the execution of JmxReporter. java: 101, re-register the deleted MBean named "consumer-metrics". Maybe we can modify the metricGrpName of SelectorMetrics to "consumer-selector-metrics", or adjust the closing order of components in close () in KafkaConsumer, and put Utils.closeQuietly (metrics ... behind AppInfoParser.unregisterAppInfo (... > Memory leak in KafkaMetrics registered to MBean > --- > > Key: KAFKA-9504 > URL: https://issues.apache.org/jira/browse/KAFKA-9504 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: Andreas Holmén >Priority: Major > > After close() called on a KafkaConsumer some registered MBeans are not > unregistered causing leak. > > > {code:java} > import static > org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; > import java.lang.management.ManagementFactory; > import java.util.HashMap; > import java.util.Map; > import javax.management.MBeanServer; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.serialization.ByteArrayDeserializer; > public class Leaker { > private static String bootstrapServers = "hostname:9092"; > > public static void main(String[] args) throws InterruptedException { > MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); > Map props = new HashMap<>(); > props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); > > int beans = mBeanServer.getMBeanCount(); > for (int i = 0; i < 100; i++) { >KafkaConsumer consumer = new KafkaConsumer<>(props, new > ByteArrayDeserializer(), new ByteArrayDeserializer()); >consumer.close(); > } > int newBeans = mBeanServer.getMBeanCount(); > System.out.println("\nbeans delta: " + (newBeans - beans)); > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean
[ https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035929#comment-17035929 ] Junyong Li edited comment on KAFKA-9504 at 2/13/20 5:29 AM: Maybe I found the cause: KafkaConsumerMetrics and SelectorMetrics use the same metricGrpName, which is "consumer-metrics", and SelectorMetrics will call metrics.removeMetric in its close (). Because KafkaConsumerMetrics does not delete the metric items added during initialization, SelectorMetrics's close() will eventually cause the execution of JmxReporter. java: 101, re-register the deleted MBean named "consumer-metrics". Maybe we can modify the metricGrpName of SelectorMetrics to "consumer-selector-metrics", or adjust the closing order of components in close () in KafkaConsumer, and put Utils.closeQuietly (metrics ... behind AppInfoParser.unregisterAppInfo (... was (Author: lijy83): Maybe I found the cause: KafkaConsumerMetrics and SelectorMetrics use the same metricGrpName, which is "consumer-metrics", and SelectorMetrics will call metrics.removeMetric in its close (). Because KafkaConsumerMetrics does not delete the metric items added during initialization, it will eventually cause the execution of JmxReporter. java: 101, re-register the deleted MBean named "consumer-metrics". Maybe we can modify the metricGrpName of SelectorMetrics to "consumer-selector-metrics", or adjust the closing order of components in close () in KafkaConsumer, and put Utils.closeQuietly (metrics ... behind AppInfoParser.unregisterAppInfo (... > Memory leak in KafkaMetrics registered to MBean > --- > > Key: KAFKA-9504 > URL: https://issues.apache.org/jira/browse/KAFKA-9504 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: Andreas Holmén >Priority: Major > > After close() called on a KafkaConsumer some registered MBeans are not > unregistered causing leak. > > > {code:java} > import static > org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; > import java.lang.management.ManagementFactory; > import java.util.HashMap; > import java.util.Map; > import javax.management.MBeanServer; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.serialization.ByteArrayDeserializer; > public class Leaker { > private static String bootstrapServers = "hostname:9092"; > > public static void main(String[] args) throws InterruptedException { > MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); > Map props = new HashMap<>(); > props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); > > int beans = mBeanServer.getMBeanCount(); > for (int i = 0; i < 100; i++) { >KafkaConsumer consumer = new KafkaConsumer<>(props, new > ByteArrayDeserializer(), new ByteArrayDeserializer()); >consumer.close(); > } > int newBeans = mBeanServer.getMBeanCount(); > System.out.println("\nbeans delta: " + (newBeans - beans)); > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean
[ https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035929#comment-17035929 ] Junyong Li commented on KAFKA-9504: --- Maybe I found the cause: KafkaConsumerMetrics and SelectorMetrics use the same metricGrpName, which is "consumer-metrics", and SelectorMetrics will call metrics.removeMetric in its close (). Because KafkaConsumerMetrics does not delete the metric items added during initialization, it will eventually cause the execution of JmxReporter. java: 101, re-register the deleted MBean named "consumer-metrics". Maybe we can modify the metricGrpName of SelectorMetrics to "consumer-selector-metrics", or adjust the closing order of components in close () in KafkaConsumer, and put Utils.closeQuietly (metrics ... behind AppInfoParser.unregisterAppInfo (... > Memory leak in KafkaMetrics registered to MBean > --- > > Key: KAFKA-9504 > URL: https://issues.apache.org/jira/browse/KAFKA-9504 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: Andreas Holmén >Priority: Major > > After close() called on a KafkaConsumer some registered MBeans are not > unregistered causing leak. > > > {code:java} > import static > org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; > import java.lang.management.ManagementFactory; > import java.util.HashMap; > import java.util.Map; > import javax.management.MBeanServer; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.serialization.ByteArrayDeserializer; > public class Leaker { > private static String bootstrapServers = "hostname:9092"; > > public static void main(String[] args) throws InterruptedException { > MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); > Map props = new HashMap<>(); > props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); > > int beans = mBeanServer.getMBeanCount(); > for (int i = 0; i < 100; i++) { >KafkaConsumer consumer = new KafkaConsumer<>(props, new > ByteArrayDeserializer(), new ByteArrayDeserializer()); >consumer.close(); > } > int newBeans = mBeanServer.getMBeanCount(); > System.out.println("\nbeans delta: " + (newBeans - beans)); > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values
[ https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035927#comment-17035927 ] Michael Viamari commented on KAFKA-9533: Sure. I can take a look at it. Before I get started: 1) How should I think about the case where someone might unintentionally (or intentionally) be relying on the buggy behavior? Is that something I should handle in code, or is it handled elsewhere? 2) I noticed that KStream#transform uses an adaptor to conform to KStream#flatTransform. Should I do something similar for KStream#transformValues and KStream#flatTransformValues, since I will be modifying that area of code already? > ValueTransform forwards `null` values > - > > Key: KAFKA-9533 > URL: https://issues.apache.org/jira/browse/KAFKA-9533 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Viamari >Priority: Minor > > According to the documentation for `KStream#transformValues`, nulls returned > from `ValueTransformer#transform` are not forwarded. (see > [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-] > However, this does not appear to be the case. In > `KStreamTransformValuesProcessor#process` the result of the transform is > forwarded directly. > {code:java} > @Override > public void process(final K key, final V value) { > context.forward(key, valueTransformer.transform(key, value)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
[ https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-9545: -- Assignee: Boyang Chen > Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted` > -- > > Key: KAFKA-9545 > URL: https://issues.apache.org/jira/browse/KAFKA-9545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jason Gustafson >Assignee: Boyang Chen >Priority: Major > > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/ > {code} > java.lang.AssertionError: Condition not met within timeout 15000. Stream > tasks not updated > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035922#comment-17035922 ] zhangzhisheng commented on KAFKA-6834: -- kafka2.12_0.11.0.3 log clean tread stoped,throw exception info e.g {code:java} // code placeholder java.lang.IllegalStateException: This log contains a message larger than maximum allowable size of 100. {code} > log cleaner should handle the case when the size of a message set is larger > than the max message size > - > > Key: KAFKA-6834 > URL: https://issues.apache.org/jira/browse/KAFKA-6834 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.0 > > > In KAFKA-5316, we added the logic to allow a message (set) larger than the > per topic message size to be written to the log during log cleaning. However, > the buffer size in the log cleaner is still bounded by the per topic message > size. This can cause the log cleaner to die and cause the broker to run out > of disk space. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9441) Refactor commit logic
[ https://issues.apache.org/jira/browse/KAFKA-9441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035875#comment-17035875 ] ASF GitHub Bot commented on KAFKA-9441: --- mjsax commented on pull request #8105: KAFKA-9441: Add internal StreamsProducer URL: https://github.com/apache/kafka/pull/8105 Upfront refactoring for KIP-447. Introduces `StreamsProducer` that allows to share a producer over multiple tasks and track the TX status. Call for review @guozhangwang @abbccdda This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor commit logic > - > > Key: KAFKA-9441 > URL: https://issues.apache.org/jira/browse/KAFKA-9441 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > > Using producer per thread in combination with EOS, it's not possible any > longer to commit individual task independently (as done currently). > We need to refactor StreamsThread, to commit all tasks at the same time for > the new model. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9541) Flaky Test DescribeConsumerGroupTest#testDescribeGroupMembersWithShortInitializationTimeout
[ https://issues.apache.org/jira/browse/KAFKA-9541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035028#comment-17035028 ] huxihx edited comment on KAFKA-9541 at 2/13/20 2:14 AM: Occasionally the captured exception is DisconnectException instead of TimeoutException. That might be due to an unexpected long pause that caused the node disconnection. was (Author: huxi_2b): Occasionally the captured exception is DisconnectedException instead of TimeoutException. That might be due to an unexpected long pause that caused the node disconnection. > Flaky Test > DescribeConsumerGroupTest#testDescribeGroupMembersWithShortInitializationTimeout > --- > > Key: KAFKA-9541 > URL: https://issues.apache.org/jira/browse/KAFKA-9541 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.4.0 >Reporter: huxihx >Assignee: huxihx >Priority: Major > > h3. Error Message > java.lang.AssertionError: assertion failed > h3. Stacktrace > java.lang.AssertionError: assertion failed at > scala.Predef$.assert(Predef.scala:267) at > kafka.admin.DescribeConsumerGroupTest.testDescribeGroupMembersWithShortInitializationTimeout(DescribeConsumerGroupTest.scala:630) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) > at jdk.internal.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at >
[jira] [Commented] (KAFKA-7061) Enhanced log compaction
[ https://issues.apache.org/jira/browse/KAFKA-7061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035829#comment-17035829 ] Senthilnathan Muthusamy commented on KAFKA-7061: Opened PR - rebased with the latest apache/trunk [https://github.com/apache/kafka/pull/8103] > Enhanced log compaction > --- > > Key: KAFKA-7061 > URL: https://issues.apache.org/jira/browse/KAFKA-7061 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.5.0 >Reporter: Luis Cabral >Assignee: Senthilnathan Muthusamy >Priority: Major > Labels: kip > > Enhance log compaction to support more than just offset comparison, so the > insertion order isn't dictating which records to keep. > Default behavior is kept as it was, with the enhanced approached having to be > purposely activated. > The enhanced compaction is done either via the record timestamp, by settings > the new configuration as "timestamp" or via the record headers by setting > this configuration to anything other than the default "offset" or the > reserved "timestamp". > See > [KIP-280|https://cwiki.apache.org/confluence/display/KAFKA/KIP-280%3A+Enhanced+log+compaction] > for more details. > +From Guozhang:+ We should emphasize on the WIKI that the newly introduced > config yields to the existing "log.cleanup.policy", i.e. if the latter's > value is `delete` not `compact`, then the previous config would be ignored. > +From Jun Rao:+ With the timestamp/header strategy, the behavior of the > application may need to change. In particular, the application can't just > blindly take the record with a larger offset and assuming that it's the value > to keep. It needs to check the timestamp or the header now. So, it would be > useful to at least document this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7061) Enhanced log compaction
[ https://issues.apache.org/jira/browse/KAFKA-7061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035828#comment-17035828 ] ASF GitHub Bot commented on KAFKA-7061: --- senthilm-ms commented on pull request #7528: KAFKA-7061: KIP-280 Enhanced log compaction URL: https://github.com/apache/kafka/pull/7528 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enhanced log compaction > --- > > Key: KAFKA-7061 > URL: https://issues.apache.org/jira/browse/KAFKA-7061 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.5.0 >Reporter: Luis Cabral >Assignee: Senthilnathan Muthusamy >Priority: Major > Labels: kip > > Enhance log compaction to support more than just offset comparison, so the > insertion order isn't dictating which records to keep. > Default behavior is kept as it was, with the enhanced approached having to be > purposely activated. > The enhanced compaction is done either via the record timestamp, by settings > the new configuration as "timestamp" or via the record headers by setting > this configuration to anything other than the default "offset" or the > reserved "timestamp". > See > [KIP-280|https://cwiki.apache.org/confluence/display/KAFKA/KIP-280%3A+Enhanced+log+compaction] > for more details. > +From Guozhang:+ We should emphasize on the WIKI that the newly introduced > config yields to the existing "log.cleanup.policy", i.e. if the latter's > value is `delete` not `compact`, then the previous config would be ignored. > +From Jun Rao:+ With the timestamp/header strategy, the behavior of the > application may need to change. In particular, the application can't just > blindly take the record with a larger offset and assuming that it's the value > to keep. It needs to check the timestamp or the header now. So, it would be > useful to at least document this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
Jason Gustafson created KAFKA-9545: -- Summary: Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted` Key: KAFKA-9545 URL: https://issues.apache.org/jira/browse/KAFKA-9545 Project: Kafka Issue Type: Bug Components: streams Reporter: Jason Gustafson https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/ {code} java.lang.AssertionError: Condition not met within timeout 15000. Stream tasks not updated at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337) at org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9475) Replace transaction abortion scheduler with a delayed queue
[ https://issues.apache.org/jira/browse/KAFKA-9475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9475: --- Labels: (was: help-wanted) > Replace transaction abortion scheduler with a delayed queue > --- > > Key: KAFKA-9475 > URL: https://issues.apache.org/jira/browse/KAFKA-9475 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Priority: Major > > Although we could try setting the txn timeout to be 10 second, the purging > scheduler only works every one minute interval, so in the worst case we shall > still wait for 1 minute. We are considering several potential fixes: > # Change interval to 10 seconds: means we will have 6X frequent checking, > more read contention on txn metadata. The benefit here is an easy one-line > fix without correctness concern > # Use an existing delayed queue, a.k.a purgatory. From what I heard, the > purgatory needs at least 2 extra threads to work properly, with some add-on > overhead for memory and complexity. The benefit here is more precise timeout > reaction, without a redundant full metadata read lock. > # Create a new delayed queue. This could be done by using scala delayed > queue, the concern here is that whether this approach is production ready. > Benefits are the same as 2, with less code complexity potentially > This ticket is to track #2 progress if we decide to go through this path > eventually. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9204) ReplaceField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-9204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-9204. -- Fix Version/s: 2.4.1 2.3.2 2.5.0 2.2.3 Reviewer: Randall Hauch Resolution: Fixed Merged to the `trunk`, `2.5`, `2.4`, `2.3`, and `2.2` branches. > ReplaceField transformation fails when encountering tombstone event > --- > > Key: KAFKA-9204 > URL: https://issues.apache.org/jira/browse/KAFKA-9204 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Georgios Kalogiros >Priority: Major > Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1 > > > When applying the {{ReplaceField}} transformation to a tombstone event, an > exception is raised: > > {code:java} > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error > handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.connect.errors.DataException: Only Map objects > supported in absence of schema for [field replacement], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.ReplaceField.applySchemaless(ReplaceField.java:134) > at > org.apache.kafka.connect.transforms.ReplaceField.apply(ReplaceField.java:127) > at > org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) > ... 14 more > {code} > There was a similar bug for the InsertField transformation that got merged in > recently: > https://issues.apache.org/jira/browse/KAFKA-8523 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9475) Replace transaction abortion scheduler with a delayed queue
[ https://issues.apache.org/jira/browse/KAFKA-9475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9475: --- Labels: help-wanted (was: ) > Replace transaction abortion scheduler with a delayed queue > --- > > Key: KAFKA-9475 > URL: https://issues.apache.org/jira/browse/KAFKA-9475 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Labels: help-wanted > > Although we could try setting the txn timeout to be 10 second, the purging > scheduler only works every one minute interval, so in the worst case we shall > still wait for 1 minute. We are considering several potential fixes: > # Change interval to 10 seconds: means we will have 6X frequent checking, > more read contention on txn metadata. The benefit here is an easy one-line > fix without correctness concern > # Use an existing delayed queue, a.k.a purgatory. From what I heard, the > purgatory needs at least 2 extra threads to work properly, with some add-on > overhead for memory and complexity. The benefit here is more precise timeout > reaction, without a redundant full metadata read lock. > # Create a new delayed queue. This could be done by using scala delayed > queue, the concern here is that whether this approach is production ready. > Benefits are the same as 2, with less code complexity potentially > This ticket is to track #2 progress if we decide to go through this path > eventually. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9475) Replace transaction abortion scheduler with a delayed queue
[ https://issues.apache.org/jira/browse/KAFKA-9475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-9475: -- Assignee: (was: Boyang Chen) > Replace transaction abortion scheduler with a delayed queue > --- > > Key: KAFKA-9475 > URL: https://issues.apache.org/jira/browse/KAFKA-9475 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Priority: Major > Labels: help-wanted > > Although we could try setting the txn timeout to be 10 second, the purging > scheduler only works every one minute interval, so in the worst case we shall > still wait for 1 minute. We are considering several potential fixes: > # Change interval to 10 seconds: means we will have 6X frequent checking, > more read contention on txn metadata. The benefit here is an easy one-line > fix without correctness concern > # Use an existing delayed queue, a.k.a purgatory. From what I heard, the > purgatory needs at least 2 extra threads to work properly, with some add-on > overhead for memory and complexity. The benefit here is more precise timeout > reaction, without a redundant full metadata read lock. > # Create a new delayed queue. This could be done by using scala delayed > queue, the concern here is that whether this approach is production ready. > Benefits are the same as 2, with less code complexity potentially > This ticket is to track #2 progress if we decide to go through this path > eventually. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9544) Flaky Test `KafkaAdminClientTest.testDefaultApiTimeoutOverride`
[ https://issues.apache.org/jira/browse/KAFKA-9544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035767#comment-17035767 ] ASF GitHub Bot commented on KAFKA-9544: --- hachikuji commented on pull request #8101: KAFKA-9544; Fix flaky test `AdminClientTest.testDefaultApiTimeoutOverride` URL: https://github.com/apache/kafka/pull/8101 There is a race condition with the backoff sleep in the test case and setting the next allowed send time in the AdminClient. To fix it, we allow the test case to do the backoff sleep multiple times if needed. This was fairly easy to reproduce prior to this fix. With the fix, I could not reproduce the problem after 500 runs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky Test `KafkaAdminClientTest.testDefaultApiTimeoutOverride` > --- > > Key: KAFKA-9544 > URL: https://issues.apache.org/jira/browse/KAFKA-9544 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > {code} > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1260) > at > org.apache.kafka.clients.admin.KafkaAdminClient.close(KafkaAdminClient.java:594) > at org.apache.kafka.clients.admin.Admin.close(Admin.java:98) > at org.apache.kafka.clients.admin.Admin.close(Admin.java:81) > at > org.apache.kafka.clients.admin.AdminClientUnitTestEnv.close(AdminClientUnitTestEnv.java:116) > at > org.apache.kafka.clients.admin.KafkaAdminClientTest.testApiTimeout(KafkaAdminClientTest.java:2642) > at > org.apache.kafka.clients.admin.KafkaAdminClientTest.testDefaultApiTimeoutOverride(KafkaAdminClientTest.java:2595) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9544) Flaky Test `KafkaAdminClientTest.testDefaultApiTimeoutOverride`
Jason Gustafson created KAFKA-9544: -- Summary: Flaky Test `KafkaAdminClientTest.testDefaultApiTimeoutOverride` Key: KAFKA-9544 URL: https://issues.apache.org/jira/browse/KAFKA-9544 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson {code} org.junit.runners.model.TestTimedOutException: test timed out after 12 milliseconds at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1260) at org.apache.kafka.clients.admin.KafkaAdminClient.close(KafkaAdminClient.java:594) at org.apache.kafka.clients.admin.Admin.close(Admin.java:98) at org.apache.kafka.clients.admin.Admin.close(Admin.java:81) at org.apache.kafka.clients.admin.AdminClientUnitTestEnv.close(AdminClientUnitTestEnv.java:116) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testApiTimeout(KafkaAdminClientTest.java:2642) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testDefaultApiTimeoutOverride(KafkaAdminClientTest.java:2595) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9204) ReplaceField transformation fails when encountering tombstone event
[ https://issues.apache.org/jira/browse/KAFKA-9204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035764#comment-17035764 ] ASF GitHub Bot commented on KAFKA-9204: --- rhauch commented on pull request #7731: KAFKA-9204: allow ReplaceField SMT to handle tombstone records URL: https://github.com/apache/kafka/pull/7731 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ReplaceField transformation fails when encountering tombstone event > --- > > Key: KAFKA-9204 > URL: https://issues.apache.org/jira/browse/KAFKA-9204 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Georgios Kalogiros >Priority: Major > > When applying the {{ReplaceField}} transformation to a tombstone event, an > exception is raised: > > {code:java} > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error > handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.connect.errors.DataException: Only Map objects > supported in absence of schema for [field replacement], found: null > at > org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38) > at > org.apache.kafka.connect.transforms.ReplaceField.applySchemaless(ReplaceField.java:134) > at > org.apache.kafka.connect.transforms.ReplaceField.apply(ReplaceField.java:127) > at > org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) > ... 14 more > {code} > There was a similar bug for the InsertField transformation that got merged in > recently: > https://issues.apache.org/jira/browse/KAFKA-8523 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9192) NullPointerException if field in schema not present in value
[ https://issues.apache.org/jira/browse/KAFKA-9192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-9192. -- Fix Version/s: 2.4.1 2.3.2 2.5.0 2.2.3 Reviewer: Randall Hauch Resolution: Fixed Merged to the `trunk`, `2.5`, `2.4`, `2.3`, and `2.2` branches. > NullPointerException if field in schema not present in value > > > Key: KAFKA-9192 > URL: https://issues.apache.org/jira/browse/KAFKA-9192 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.1 >Reporter: Mark Tinsley >Priority: Major > Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1 > > > Given a message: > {code:java} > { >"schema":{ > "type":"struct", > "fields":[ > { > "type":"string", > "optional":true, > "field":"abc" > } > ], > "optional":false, > "name":"foobar" >}, >"payload":{ >} > } > {code} > I would expect, given the field is optional, for the JsonConverter to still > process this value. > What happens is I get a null pointer exception, the stacktrace points to this > line: > https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L701 > called by > https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L181 > Issue seems to be that we need to check and see if the jsonValue is null > before checking if the jsonValue has a null value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9417) Integration test for new EOS model with vanilla Producer and Consumer
[ https://issues.apache.org/jira/browse/KAFKA-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-9417. -- Resolution: Fixed > Integration test for new EOS model with vanilla Producer and Consumer > - > > Key: KAFKA-9417 > URL: https://issues.apache.org/jira/browse/KAFKA-9417 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > We would like to extend the `TransactionMessageCopier` to use the new > subscription mode consumer and do a system test based off that in order to > verify the new semantic actually works. > We also want to make sure the backward compatibility is maintained by using > group metadata API in existing tests as well. > > A minor public change is also included within this PR by setting > `transaction.abort.timed.out.transaction.cleanup.interval.ms` default to > 1 ms (10 seconds) on broker side -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9417) Integration test for new EOS model with vanilla Producer and Consumer
[ https://issues.apache.org/jira/browse/KAFKA-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-9417: - Fix Version/s: 2.5.0 > Integration test for new EOS model with vanilla Producer and Consumer > - > > Key: KAFKA-9417 > URL: https://issues.apache.org/jira/browse/KAFKA-9417 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.5.0 > > > We would like to extend the `TransactionMessageCopier` to use the new > subscription mode consumer and do a system test based off that in order to > verify the new semantic actually works. > We also want to make sure the backward compatibility is maintained by using > group metadata API in existing tests as well. > > A minor public change is also included within this PR by setting > `transaction.abort.timed.out.transaction.cleanup.interval.ms` default to > 1 ms (10 seconds) on broker side -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9192) NullPointerException if field in schema not present in value
[ https://issues.apache.org/jira/browse/KAFKA-9192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035732#comment-17035732 ] ASF GitHub Bot commented on KAFKA-9192: --- rhauch commented on pull request #7733: KAFKA-9192: fix NPE when for converting optional json schema in structs URL: https://github.com/apache/kafka/pull/7733 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > NullPointerException if field in schema not present in value > > > Key: KAFKA-9192 > URL: https://issues.apache.org/jira/browse/KAFKA-9192 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.1 >Reporter: Mark Tinsley >Priority: Major > > Given a message: > {code:java} > { >"schema":{ > "type":"struct", > "fields":[ > { > "type":"string", > "optional":true, > "field":"abc" > } > ], > "optional":false, > "name":"foobar" >}, >"payload":{ >} > } > {code} > I would expect, given the field is optional, for the JsonConverter to still > process this value. > What happens is I get a null pointer exception, the stacktrace points to this > line: > https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L701 > called by > https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L181 > Issue seems to be that we need to check and see if the jsonValue is null > before checking if the jsonValue has a null value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9521) Flaky Test DeleteConsumerGroupsTest#testDeleteCmdEmptyGroup
[ https://issues.apache.org/jira/browse/KAFKA-9521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035693#comment-17035693 ] Matthias J. Sax commented on KAFKA-9521: Different test method but same error message. I don't create a new ticket because we have already many for this test for different method and they all seems to be related. [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/659/testReport/junit/kafka.admin/DeleteConsumerGroupsTest/testDeleteWithMixOfSuccessAndError/] > Flaky Test DeleteConsumerGroupsTest#testDeleteCmdEmptyGroup > --- > > Key: KAFKA-9521 > URL: https://issues.apache.org/jira/browse/KAFKA-9521 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > > Failed for me locally. Lost the actual stack trace/output but the error was > "java.lang.AssertionError: The group did not become empty as expected" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9521) Flaky Test DeleteConsumerGroupsTest#testDeleteCmdEmptyGroup
[ https://issues.apache.org/jira/browse/KAFKA-9521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9521: --- Component/s: unit tests > Flaky Test DeleteConsumerGroupsTest#testDeleteCmdEmptyGroup > --- > > Key: KAFKA-9521 > URL: https://issues.apache.org/jira/browse/KAFKA-9521 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > > Failed for me locally. Lost the actual stack trace/output but the error was > "java.lang.AssertionError: The group did not become empty as expected" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9530) Flaky Test kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout
[ https://issues.apache.org/jira/browse/KAFKA-9530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035692#comment-17035692 ] Matthias J. Sax commented on KAFKA-9530: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/659/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupWithShortInitializationTimeout/] > Flaky Test > kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout > > > Key: KAFKA-9530 > URL: https://issues.apache.org/jira/browse/KAFKA-9530 > Project: Kafka > Issue Type: Test > Components: core >Reporter: Bill Bejeck >Priority: Major > Labels: flaky-test, test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4570/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupWithShortInitializationTimeout/] > > {noformat} > Error Messagejava.lang.AssertionError: assertion > failedStacktracejava.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:267) > at > kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout(DescribeConsumerGroupTest.scala:585) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor28.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at >
[jira] [Commented] (KAFKA-9538) Flaky Test `testResetOffsetsExportImportPlan`
[ https://issues.apache.org/jira/browse/KAFKA-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035691#comment-17035691 ] Matthias J. Sax commented on KAFKA-9538: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/659/testReport/junit/kafka.admin/ResetConsumerGroupOffsetTest/testResetOffsetsAllTopicsAllGroups/] > Flaky Test `testResetOffsetsExportImportPlan` > - > > Key: KAFKA-9538 > URL: https://issues.apache.org/jira/browse/KAFKA-9538 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: huxihx >Priority: Major > > {code} > 19:44:41 > 19:44:41 kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsExportImportPlan FAILED > 19:44:41 java.lang.AssertionError: expected: 2, bar2-1 -> > 2)> but was: > 19:44:41 at org.junit.Assert.fail(Assert.java:89) > 19:44:41 at org.junit.Assert.failNotEquals(Assert.java:835) > 19:44:41 at org.junit.Assert.assertEquals(Assert.java:120) > 19:44:41 at org.junit.Assert.assertEquals(Assert.java:146) > 19:44:41 at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan(ResetConsumerGroupOffsetTest.scala:429) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9541) Flaky Test DescribeConsumerGroupTest#testDescribeGroupMembersWithShortInitializationTimeout
[ https://issues.apache.org/jira/browse/KAFKA-9541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035690#comment-17035690 ] Matthias J. Sax commented on KAFKA-9541: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/659/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupMembersWithShortInitializationTimeout/] > Flaky Test > DescribeConsumerGroupTest#testDescribeGroupMembersWithShortInitializationTimeout > --- > > Key: KAFKA-9541 > URL: https://issues.apache.org/jira/browse/KAFKA-9541 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.4.0 >Reporter: huxihx >Assignee: huxihx >Priority: Major > > h3. Error Message > java.lang.AssertionError: assertion failed > h3. Stacktrace > java.lang.AssertionError: assertion failed at > scala.Predef$.assert(Predef.scala:267) at > kafka.admin.DescribeConsumerGroupTest.testDescribeGroupMembersWithShortInitializationTimeout(DescribeConsumerGroupTest.scala:630) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) > at jdk.internal.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) at >
[jira] [Commented] (KAFKA-8110) Flaky Test DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions
[ https://issues.apache.org/jira/browse/KAFKA-8110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035689#comment-17035689 ] Matthias J. Sax commented on KAFKA-8110: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/659/testReport/junit/kafka.admin/DeleteConsumerGroupsTest/testDeleteCmdWithMixOfSuccessAndError/] > Flaky Test > DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions > -- > > Key: KAFKA-8110 > URL: https://issues.apache.org/jira/browse/KAFKA-8110 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > Fix For: 2.5.0 > > > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/67/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeMembersWithConsumersWithoutAssignedPartitions/] > {quote}java.lang.AssertionError: Partition [__consumer_offsets,0] metadata > not propagated after 15000 ms at > kafka.utils.TestUtils$.fail(TestUtils.scala:381) at > kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:791) at > kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:880) at > kafka.utils.TestUtils$.$anonfun$createTopic$3(TestUtils.scala:318) at > kafka.utils.TestUtils$.$anonfun$createTopic$3$adapted(TestUtils.scala:317) at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at > scala.collection.immutable.Range.foreach(Range.scala:158) at > scala.collection.TraversableLike.map(TraversableLike.scala:237) at > scala.collection.TraversableLike.map$(TraversableLike.scala:230) at > scala.collection.AbstractTraversable.map(Traversable.scala:108) at > kafka.utils.TestUtils$.createTopic(TestUtils.scala:317) at > kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:375) at > kafka.admin.DescribeConsumerGroupTest.testDescribeMembersWithConsumersWithoutAssignedPartitions(DescribeConsumerGroupTest.scala:372){quote} > STDOUT > {quote}[2019-03-14 20:01:52,347] WARN Ignoring unexpected runtime exception > (org.apache.zookeeper.server.NIOServerCnxnFactory:236) > java.nio.channels.CancelledKeyException at > sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at > sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87) at > org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205) > at java.lang.Thread.run(Thread.java:748) TOPIC PARTITION CURRENT-OFFSET > LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID foo 0 0 0 0 - - - TOPIC > PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID foo 0 > 0 0 0 - - - COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS > localhost:44669 (0){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9417) Integration test for new EOS model with vanilla Producer and Consumer
[ https://issues.apache.org/jira/browse/KAFKA-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035688#comment-17035688 ] ASF GitHub Bot commented on KAFKA-9417: --- guozhangwang commented on pull request #8000: KAFKA-9417: New Integration Test for KIP-447 URL: https://github.com/apache/kafka/pull/8000 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Integration test for new EOS model with vanilla Producer and Consumer > - > > Key: KAFKA-9417 > URL: https://issues.apache.org/jira/browse/KAFKA-9417 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > We would like to extend the `TransactionMessageCopier` to use the new > subscription mode consumer and do a system test based off that in order to > verify the new semantic actually works. > We also want to make sure the backward compatibility is maintained by using > group metadata API in existing tests as well. > > A minor public change is also included within this PR by setting > `transaction.abort.timed.out.transaction.cleanup.interval.ms` default to > 1 ms (10 seconds) on broker side -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9530) Flaky Test kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout
[ https://issues.apache.org/jira/browse/KAFKA-9530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035684#comment-17035684 ] Matthias J. Sax commented on KAFKA-9530: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4672/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupWithShortInitializationTimeout/] > Flaky Test > kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout > > > Key: KAFKA-9530 > URL: https://issues.apache.org/jira/browse/KAFKA-9530 > Project: Kafka > Issue Type: Test > Components: core >Reporter: Bill Bejeck >Priority: Major > Labels: flaky-test, test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4570/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupWithShortInitializationTimeout/] > > {noformat} > Error Messagejava.lang.AssertionError: assertion > failedStacktracejava.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:267) > at > kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout(DescribeConsumerGroupTest.scala:585) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at jdk.internal.reflect.GeneratedMethodAccessor28.invoke(Unknown Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at >
[jira] [Resolved] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional
[ https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6607. Fix Version/s: 2.5.0 Resolution: Fixed > Kafka Streams lag not zero when input topic transactional > - > > Key: KAFKA-6607 > URL: https://issues.apache.org/jira/browse/KAFKA-6607 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > Fix For: 2.5.0 > > > When an input topic for a Kafka Streams application is written using > transaction, Kafka Streams commits an "incorrect" offset, ie, it commits > "lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it > reaches the end of topic. The reason is the commit marker that is the last > "message" in the topic; Streams does not take commit markers into account > when committing. > This is not a correctness issue, but when one inspect the consumer lag via > {{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is > correct from consumer-group tool point of view. > Note that all applications using a plain consumer may face the same issue if > they use `KafkaConsumer#commitSync(Map > offsets)`: to address the issue, the correct pattern is to either commit > "nextRecord.offset()" (if the next record is available already, ie, was > returned by `poll()`, or use `consumer.position()` that takes the commit > marker into account and would "step over it"). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional
[ https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6607: --- Description: When an input topic for a Kafka Streams application is written using transaction, Kafka Streams commits an "incorrect" offset, ie, it commits "lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it reaches the end of topic. The reason is the commit marker that is the last "message" in the topic; Streams does not take commit markers into account when committing. This is not a correctness issue, but when one inspect the consumer lag via {{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is correct from consumer-group tool point of view. Note that all applications using a plain consumer may face the same issue if they use `KafkaConsumer#commitSync(Map offsets)`: to address the issue, the correct pattern is to either commit "nextRecord.offset()" (if the next record is available already, ie, was returned by `poll()`, or use `consumer.position()` that takes the commit marker into account and would "step over it"). was: When an input topic for a Kafka Streams application is written using transaction, Kafka Streams commits an "incorrect" offset, ie, it commits "lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it reaches the end of topic. The reason is the commit marker that is the last "message" in the topic; Streams does not take commit markers into account when committing. This is not a correctness issue, but when one inspect the consumer lag via {{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is correct from consumer-group tool point of view. > Kafka Streams lag not zero when input topic transactional > - > > Key: KAFKA-6607 > URL: https://issues.apache.org/jira/browse/KAFKA-6607 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > When an input topic for a Kafka Streams application is written using > transaction, Kafka Streams commits an "incorrect" offset, ie, it commits > "lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it > reaches the end of topic. The reason is the commit marker that is the last > "message" in the topic; Streams does not take commit markers into account > when committing. > This is not a correctness issue, but when one inspect the consumer lag via > {{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is > correct from consumer-group tool point of view. > Note that all applications using a plain consumer may face the same issue if > they use `KafkaConsumer#commitSync(Map > offsets)`: to address the issue, the correct pattern is to either commit > "nextRecord.offset()" (if the next record is available already, ie, was > returned by `poll()`, or use `consumer.position()` that takes the commit > marker into account and would "step over it"). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional
[ https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6607: --- Description: When an input topic for a Kafka Streams application is written using transaction, Kafka Streams commits an "incorrect" offset, ie, it commits "lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it reaches the end of topic. The reason is the commit marker that is the last "message" in the topic; Streams does not take commit markers into account when committing. This is not a correctness issue, but when one inspect the consumer lag via {{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is correct from consumer-group tool point of view. was: When an input topic for a Consumer or Kafka Streams application is written using transaction, the client does not commit "endOffset" but "endOffset - 1" (or to be more precise, "lastProcessedMessageOffset + 1") if it reaches the end of topic. The reason is the commit marker that is the last "message" in the topic; Streams commit "offset of last processed message plus 1" and does not take commit markers into account. This is not a correctness issue, but when one inspect the consumer lag via {{bin/kafka-consumer.group.sh}} the lag is show as 1 instead of 0 – what is correct from consumer-group tool point of view. > Kafka Streams lag not zero when input topic transactional > - > > Key: KAFKA-6607 > URL: https://issues.apache.org/jira/browse/KAFKA-6607 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > When an input topic for a Kafka Streams application is written using > transaction, Kafka Streams commits an "incorrect" offset, ie, it commits > "lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it > reaches the end of topic. The reason is the commit marker that is the last > "message" in the topic; Streams does not take commit markers into account > when committing. > This is not a correctness issue, but when one inspect the consumer lag via > {{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is > correct from consumer-group tool point of view. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional
[ https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6607: --- Component/s: (was: clients) > Kafka Streams lag not zero when input topic transactional > - > > Key: KAFKA-6607 > URL: https://issues.apache.org/jira/browse/KAFKA-6607 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > When an input topic for a Consumer or Kafka Streams application is written > using transaction, the client does not commit "endOffset" but "endOffset - 1" > (or to be more precise, "lastProcessedMessageOffset + 1") if it reaches the > end of topic. The reason is the commit marker that is the last "message" in > the topic; Streams commit "offset of last processed message plus 1" and does > not take commit markers into account. > This is not a correctness issue, but when one inspect the consumer lag via > {{bin/kafka-consumer.group.sh}} the lag is show as 1 instead of 0 – what is > correct from consumer-group tool point of view. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional
[ https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6607: --- Summary: Kafka Streams lag not zero when input topic transactional (was: Consumer Client and Kafka Streams lag not zero when input topic transactional) > Kafka Streams lag not zero when input topic transactional > - > > Key: KAFKA-6607 > URL: https://issues.apache.org/jira/browse/KAFKA-6607 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > When an input topic for a Consumer or Kafka Streams application is written > using transaction, the client does not commit "endOffset" but "endOffset - 1" > (or to be more precise, "lastProcessedMessageOffset + 1") if it reaches the > end of topic. The reason is the commit marker that is the last "message" in > the topic; Streams commit "offset of last processed message plus 1" and does > not take commit markers into account. > This is not a correctness issue, but when one inspect the consumer lag via > {{bin/kafka-consumer.group.sh}} the lag is show as 1 instead of 0 – what is > correct from consumer-group tool point of view. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6607) Consumer Client and Kafka Streams lag not zero when input topic transactional
[ https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035681#comment-17035681 ] ASF GitHub Bot commented on KAFKA-6607: --- mjsax commented on pull request #8091: KAFKA-6607: Commit correct offsets for transactional input data URL: https://github.com/apache/kafka/pull/8091 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consumer Client and Kafka Streams lag not zero when input topic transactional > - > > Key: KAFKA-6607 > URL: https://issues.apache.org/jira/browse/KAFKA-6607 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > When an input topic for a Consumer or Kafka Streams application is written > using transaction, the client does not commit "endOffset" but "endOffset - 1" > (or to be more precise, "lastProcessedMessageOffset + 1") if it reaches the > end of topic. The reason is the commit marker that is the last "message" in > the topic; Streams commit "offset of last processed message plus 1" and does > not take commit markers into account. > This is not a correctness issue, but when one inspect the consumer lag via > {{bin/kafka-consumer.group.sh}} the lag is show as 1 instead of 0 – what is > correct from consumer-group tool point of view. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9499) Improve deletion process by batching more aggressively
[ https://issues.apache.org/jira/browse/KAFKA-9499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-9499. Fix Version/s: 2.5.0 Resolution: Fixed > Improve deletion process by batching more aggressively > -- > > Key: KAFKA-9499 > URL: https://issues.apache.org/jira/browse/KAFKA-9499 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 2.5.0 > > > I have noticed that the topics deletion process could be improve by batching > topics here: > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/TopicDeletionManager.scala#L354] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9499) Improve deletion process by batching more aggressively
[ https://issues.apache.org/jira/browse/KAFKA-9499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035658#comment-17035658 ] ASF GitHub Bot commented on KAFKA-9499: --- hachikuji commented on pull request #8053: KAFKA-9499; Improve deletion process by batching more aggressively URL: https://github.com/apache/kafka/pull/8053 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve deletion process by batching more aggressively > -- > > Key: KAFKA-9499 > URL: https://issues.apache.org/jira/browse/KAFKA-9499 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > I have noticed that the topics deletion process could be improve by batching > topics here: > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/TopicDeletionManager.scala#L354] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035614#comment-17035614 ] ASF GitHub Bot commented on KAFKA-8187: --- mjsax commented on pull request #6849: KAFKA-8187 Bugfix for branch 2.0 URL: https://github.com/apache/kafka/pull/6849 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > State store record loss across multiple reassignments when using standby tasks > -- > > Key: KAFKA-8187 > URL: https://issues.apache.org/jira/browse/KAFKA-8187 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: William Greer >Assignee: Lifei Chen >Priority: Blocker > Fix For: 2.3.0, 2.1.2, 2.2.1 > > > Overview: > There is a race condition that can cause a partitioned state store to be > missing records up to an offset when using standby tasks. > When a reassignment occurs and a task is migrated to a StandbyTask in another > StreamThread/TaskManager on the same JVM, there can be lock contention that > prevents the StandbyTask on the currently assigned StreamThread from > acquiring the lock and to not retry acquiring the lock because all of the > active StreamTasks are running for that StreamThread. If the StandbyTask does > not acquire the lock before the StreamThread enters into the RUNNING state, > then the StandbyTask will not consume any records. If there is no subsequent > reassignment before the second execution of the stateDirCleaner Thread, then > the task directory for the StandbyTask will be deleted. When the next > reassignment occurs the offset that was read by the StandbyTask at creation > time before acquiring the lock will be written back to the state store > directory, this re-creates the state store directory. > An example: > StreamThread(A) and StreamThread(B) are running on the same JVM in the same > streams application. > StreamThread(A) has StandbyTask 1_0 > StreamThread(B) has no tasks > A reassignment is triggered by another host in the streams application fleet. > StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads > one task > StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby > task for 1_0 > Here begins the race condition. > StreamThread(B) creates the StandbyTask which reads the current checkpoint > from disk. > StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's > assigned tasks. [0] > StreamThread(B) initializes the new tasks for the active and standby tasks. > [1] [2] > StreamThread(B) attempts to lock the state directory for task 1_0 but fails > with a LockException [3], since StreamThread(A) still holds the lock. > StreamThread(B) returns true from updateNewAndRestoringTasks() due to the > check at [4] which only checks that the active assigned tasks are running. > StreamThread(B) state is set to RUNNING > StreamThread(A) closes the previous StandbyTask specifically calling > closeStateManager() [5] > StreamThread(A) state is set to RUNNING > Streams application for this host has completed re-balancing and is now in > the RUNNING state. > State at this point is the following: State directory exists for 1_0 and all > data is present. > Then at a period that is 1 to 2 intervals of [6](which is default of 10 > minutes) after the reassignment had completed the stateDirCleaner thread will > execute [7]. > The stateDirCleaner will then do [8], which finds the directory 1_0, finds > that there isn't an active lock for that directory, acquire the lock, and > deletes the directory. > State at this point is the following: State directory does not exist for 1_0. > When the next reassignment occurs. The offset that was read by > StreamThread(B) during construction of the StandbyTask for 1_0 will be > written back to disk. This write re-creates the state store directory and > writes the .checkpoint file with the old offset. > State at this point is the following: State directory exists for 1_0 with a > '.checkpoint' file in it, but there is no other state store data in the > directory. > If this host is assigned the active task for 1_0 then all the history in the > state store will be missing from before the offset that was read at the > previous reassignment. > If this host is assigned the standby task for 1_0 then the lock will be > acquired and the standby will start to consume records, but it will still be > missing all records from before the offset that was read at the
[jira] [Updated] (KAFKA-9535) Metadata not updated when consumer encounters FENCED_LEADER_EPOCH
[ https://issues.apache.org/jira/browse/KAFKA-9535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9535: --- Affects Version/s: (was: 2.5.0) (was: 2.4.0) > Metadata not updated when consumer encounters FENCED_LEADER_EPOCH > - > > Key: KAFKA-9535 > URL: https://issues.apache.org/jira/browse/KAFKA-9535 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Inside the consumer Fetcher's handling of ListOffsetResponse, if we hit > `FENCED_LEADER_EPOCH` on partition level, the client will blindly retry > without refreshing the metadata, creating a stuck state as the local leader > epoch never gets updated and constantly fails the broker check. > The solution is to trigger metadata update upon receiving retriable errors, > before we kick off another offset list. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8073) Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer
[ https://issues.apache.org/jira/browse/KAFKA-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035598#comment-17035598 ] ASF GitHub Bot commented on KAFKA-8073: --- chia7712 commented on pull request #8098: KAFKA-8073 Transient failure in kafka.api.UserQuotaTest.testThrottled… URL: https://github.com/apache/kafka/pull/8098 the input data in testThrottledProducerConsumer are 1000 messages and each message carries only a string bytes which is converted from int value. The quota is 8000 bytes/second so it fails to start the throttle if the machine is too slow to complete all request. It is a timer-based test case. The workaround is to increase the size of input data. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer > -- > > Key: KAFKA-8073 > URL: https://issues.apache.org/jira/browse/KAFKA-8073 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0, 2.3.0 >Reporter: Bill Bejeck >Assignee: Chia-Ping Tsai >Priority: Critical > Fix For: 2.2.3, 2.5.0 > > > Failed in build [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20134/] > > Stacktrace and STDOUT > {noformat} > Error Message > java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have > been throttled > Stacktrace > java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have > been throttled > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:229) > at > kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:215) > at > kafka.api.BaseQuotaTest.testThrottledProducerConsumer(BaseQuotaTest.scala:82) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at >
[jira] [Commented] (KAFKA-9542) ZSTD Compression Not Working
[ https://issues.apache.org/jira/browse/KAFKA-9542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035594#comment-17035594 ] Lucas Bradstreet commented on KAFKA-9542: - Hi Prashant, Can you please check whether /tmp is mounted noexec? The library gets extracted to tmp and then loads it, so if it's noexec it's problematic. If it is, you can confirm that this is the problem by adding KAFKA_OPTS to supply -Djava.io.tmpdir=/path/to/other/dir and see whether it works with an alternative tmpdir. > ZSTD Compression Not Working > > > Key: KAFKA-9542 > URL: https://issues.apache.org/jira/browse/KAFKA-9542 > Project: Kafka > Issue Type: Bug > Components: compression >Affects Versions: 2.3.0 > Environment: Linux, CentOS >Reporter: Prashant >Priority: Critical > > I enabled zstd compression at producer by adding "compression.type=zstd" in > producer config. When try to run it, producer fails with > "org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request" > In Broker Logs, I could find following exception: > > [2020-02-12 11:48:04,623] ERROR [ReplicaManager broker=1] Error processing > append operation on partition load_logPlPts-6 (kafka.server.ReplicaManager) > org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could > not initialize class > org.apache.kafka.common.record.CompressionType$ZstdConstructors > at > org.apache.kafka.common.record.CompressionType$5.wrapForInput(CompressionType.java:133) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257) > at > org.apache.kafka.common.record.DefaultRecordBatch.iterator(DefaultRecordBatch.java:324) > at > scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:269) > at > kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:261) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:261) > at > kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:72) > at kafka.log.Log$$anonfun$append$2.liftedTree1$1(Log.scala:869) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868) > at kafka.log.Log$$anonfun$append$2.apply(Log.scala:850) > at kafka.log.Log.maybeHandleIOException(Log.scala:2065) > at kafka.log.Log.append(Log.scala:850) > at kafka.log.Log.appendAsLeader(Log.scala:819) > at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:771) > at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:759) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) > > This is fresh broker installed on "CentOS Linux" v7. This doesn't seem to be > a classpath issue as same package is working on MacOS. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8073) Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer
[ https://issues.apache.org/jira/browse/KAFKA-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-8073: - Assignee: Chia-Ping Tsai > Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer > -- > > Key: KAFKA-8073 > URL: https://issues.apache.org/jira/browse/KAFKA-8073 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0, 2.3.0 >Reporter: Bill Bejeck >Assignee: Chia-Ping Tsai >Priority: Critical > Fix For: 2.2.3, 2.5.0 > > > Failed in build [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20134/] > > Stacktrace and STDOUT > {noformat} > Error Message > java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have > been throttled > Stacktrace > java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have > been throttled > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:229) > at > kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:215) > at > kafka.api.BaseQuotaTest.testThrottledProducerConsumer(BaseQuotaTest.scala:82) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at >
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1703#comment-1703 ] Matthias J. Sax commented on KAFKA-7965: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4665/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/] > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 1.1.1, 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8037) KTable restore may load bad data
[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035541#comment-17035541 ] Sophie Blee-Goldman commented on KAFKA-8037: Ha, yes good catch I'll fix the typo in 2) That's a good point about later valid records potentially overwriting a corrupted one; I suppose we can't just load the two changelogs one after another, and would have to restore from both at the same time by choosing the next record in the original order. Might be tricky to synchronize but not impossible. I think this should still work for key deserialization errors though; we would still copy the plain key bytes with a null value on restore, which is treated as a delete by the store (without ever having to deserialize the corrupted key) > KTable restore may load bad data > > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data
[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034746#comment-17034746 ] Sophie Blee-Goldman edited comment on KAFKA-8037 at 2/12/20 5:38 PM: - [~pkleindl] [~vvcephei] [~mjsax] [~guozhang] I have kind of a different approach that might be a good compromise. We want to satisfy all of the following three goals as much as possible: # reuse the input topic as the changelog to avoid replicating all that data # make restoration as fast as possible by copying plain bytes with_out_ deserializing # don't load bad data, or copy bytes during restoration that weren't copied during normal processing As this ticket points out, we currently sacrifice 3) for the sake of 1) and 2) with the workaround being to give up 1) by turning off optimization. But what if we instead created a new type of internal topic that's essentially an "inverse-changelog" – rather than sending every record that goes to the store to this inverse-changelog, we send only the records that _don't_ get put into the store? In fact we don't even store the entire record, just the key bytes with a null value. Then once we've restored from the input-topic-changelog, we read from the inverse-changelog and any bad records get deleted without ever having to deserialize or store the value twice. My one concern is over the tracking/handling of local stream time: if we load the bad data during the initial restoration, it might bump up the stream time when it shouldn't have and potentially cause older, valid records to get dropped. If that's a real concern, then this approach would be blocked on KAFKA-9368 – however, I'm not confident that corrupted records don't currently bump the stream-time even during normal processing, and also not sure what kind of guarantees we should or do make w.r.t deserialization exceptions. Thoughts? was (Author: ableegoldman): [~pkleindl] [~vvcephei] [~mjsax] [~guozhang] I have kind of a different approach that might be a good compromise. We want to satisfy all of the following three goals as much as possible: # reuse the input topic as the changelog to avoid replicating all that data # make restoration as fast as possible by copying plain bytes with deserializing # don't load bad data, or copy bytes during restoration that weren't copied during normal processing As this ticket points out, we currently sacrifice 3) for the sake of 1) and 2) with the workaround being to give up 1) by turning off optimization. But what if we instead created a new type of internal topic that's essentially an "inverse-changelog" – rather than sending every record that goes to the store to this inverse-changelog, we send only the records that _don't_ get put into the store? In fact we don't even store the entire record, just the key bytes with a null value. Then once we've restored from the input-topic-changelog, we read from the inverse-changelog and any bad records get deleted without ever having to deserialize or store the value twice. My one concern is over the tracking/handling of local stream time: if we load the bad data during the initial restoration, it might bump up the stream time when it shouldn't have and potentially cause older, valid records to get dropped. If that's a real concern, then this approach would be blocked on KAFKA-9368 – however, I'm not confident that corrupted records don't currently bump the stream-time even during normal processing, and also not sure what kind of guarantees we should or do make w.r.t deserialization exceptions. Thoughts? > KTable restore may load bad data > > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035517#comment-17035517 ] Evan Williams commented on KAFKA-4084: -- [~sql_consulting] [~junrao] FYI, I implemented separation of replication/controller traffic, and this helped somewhat. However, I think in the end the only real solution is KIP-491. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035517#comment-17035517 ] Evan Williams edited comment on KAFKA-4084 at 2/12/20 5:08 PM: --- [~sql_consulting] [~junrao] FYI, I implemented separation of replication/controller/client traffic, and this helped somewhat. However, I think in the end the only real solution is KIP-491. was (Author: blodsbror): [~sql_consulting] [~junrao] FYI, I implemented separation of replication/controller traffic, and this helped somewhat. However, I think in the end the only real solution is KIP-491. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On
[ https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035500#comment-17035500 ] John Roesler commented on KAFKA-9517: - Hi [~psnively], Just another quick update: now all the reported bugs for foreign-key joins are fixed and merged to the branches: trunk, 2.5, and 2.4 . If you want to test the fixes, you can now just check out and build the 2.4 branch without any cherry-picking. There is still a blocker for 2.4.1, so [~bbejeck] hasn't created an actual release candidate yet. Thanks, -John > KTable Joins Without Materialized Argument Yield Results That Further Joins > NPE On > -- > > Key: KAFKA-9517 > URL: https://issues.apache.org/jira/browse/KAFKA-9517 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Paul Snively >Assignee: John Roesler >Priority: Blocker > Fix For: 2.5.0, 2.4.1 > > Attachments: test.tar.xz > > > The `KTable` API implemented [[here||#L842-L844]] > [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844] > []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of > `Materialized.with(null, null)`, as apparently do several other APIs. As the > comment spanning [these lines|#L1098-L1099]] makes clear, the result is a > `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, > attempts to `join` etc. on the resulting `KTable` fail with a > `NullPointerException`. > While there is an obvious workaround—explicitly construct the required > `Materialized` and use the APIs that take it as an argument—I have to admit I > find the existence of public APIs with this sort of bug, particularly when > the bug is literally documented as a comment in the source code, astonishing > to the point of incredulity. It calls the quality and trustworthiness of > Kafka Streams into serious question, and if a resolution is not forthcoming > within a week, we will be left with no other option but to consider technical > alternatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9543) Consumer offset reset after new segment rolling
Rafał Boniecki created KAFKA-9543: - Summary: Consumer offset reset after new segment rolling Key: KAFKA-9543 URL: https://issues.apache.org/jira/browse/KAFKA-9543 Project: Kafka Issue Type: Bug Affects Versions: 2.4.0 Reporter: Rafał Boniecki Attachments: Untitled.png After upgrade from kafka 2.1.1 to 2.4.0, I'm experiencing unexpected consumer offset resets. Consumer: {code:java} 2020-02-12T11:12:58.402+01:00 hostname 4a2a39a35a02 [2020-02-12T11:12:58,402][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-1, groupId=logstash] Fetch offset 1632750575 is out of range for partition stats-5, resetting offset {code} Broker: {code:java} 2020-02-12 11:12:58:400 CET INFO [data-plane-kafka-request-handler-1][kafka.log.Log] [Log partition=stats-5, dir=/kafka4/data] Rolled new log segment at offset 1632750565 in 2 ms.{code} All resets are perfectly correlated to rolling new segments at the broker - segment is rolled first, then, couple of ms later, reset on the consumer occurs. Attached is grafana graph with consumer lag per partition. All sudden spikes in lag are offset resets due to this bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values
[ https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035407#comment-17035407 ] Bruno Cadonna commented on KAFKA-9533: -- [~mviamari] Would you be interested in fixing this bug? > ValueTransform forwards `null` values > - > > Key: KAFKA-9533 > URL: https://issues.apache.org/jira/browse/KAFKA-9533 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Viamari >Priority: Minor > > According to the documentation for `KStream#transformValues`, nulls returned > from `ValueTransformer#transform` are not forwarded. (see > [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-] > However, this does not appear to be the case. In > `KStreamTransformValuesProcessor#process` the result of the transform is > forwarded directly. > {code:java} > @Override > public void process(final K key, final V value) { > context.forward(key, valueTransformer.transform(key, value)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9542) ZSTD Compression Not Working
Prashant created KAFKA-9542: --- Summary: ZSTD Compression Not Working Key: KAFKA-9542 URL: https://issues.apache.org/jira/browse/KAFKA-9542 Project: Kafka Issue Type: Bug Components: compression Affects Versions: 2.3.0 Environment: Linux, CentOS Reporter: Prashant I enabled zstd compression at producer by adding "compression.type=zstd" in producer config. When try to run it, producer fails with "org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request" In Broker Logs, I could find following exception: [2020-02-12 11:48:04,623] ERROR [ReplicaManager broker=1] Error processing append operation on partition load_logPlPts-6 (kafka.server.ReplicaManager) org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.record.CompressionType$ZstdConstructors at org.apache.kafka.common.record.CompressionType$5.wrapForInput(CompressionType.java:133) at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257) at org.apache.kafka.common.record.DefaultRecordBatch.iterator(DefaultRecordBatch.java:324) at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:269) at kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:261) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:261) at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:72) at kafka.log.Log$$anonfun$append$2.liftedTree1$1(Log.scala:869) at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868) at kafka.log.Log$$anonfun$append$2.apply(Log.scala:850) at kafka.log.Log.maybeHandleIOException(Log.scala:2065) at kafka.log.Log.append(Log.scala:850) at kafka.log.Log.appendAsLeader(Log.scala:819) at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:771) at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:759) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) This is fresh broker installed on "CentOS Linux" v7. This doesn't seem to be a classpath issue as same package is working on MacOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8037) KTable restore may load bad data
[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035215#comment-17035215 ] Patrik Kleindl commented on KAFKA-8037: --- [~ableegoldman] Interesting proposal. At 2. you mean "with_out_ deserializing", right? One question for me would be how you would remove records from the inverse-changelog in case a good records is received after a bad one. Otherwise the good one would be loaded and removed afterwards. Unless you do some extra timestamp-checking etc. And if I understand it correctly this would only work for problems with value deserialization, not if the key is affected, which is rare but possible. > KTable restore may load bad data > > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams
[ https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck updated KAFKA-9455: Comment: was deleted (was: [~guozhang] I have one more question What do you think about splitting a WindowStore into two stores?) > Consider using TreeMap for in-memory stores of Streams > -- > > Key: KAFKA-9455 > URL: https://issues.apache.org/jira/browse/KAFKA-9455 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > Labels: newbie++ > > From [~ableegoldman]: It's worth noting that it might be a good idea to > switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap > allows us to safely perform range queries without copying over the entire > keyset, but the performance on point queries seems to scale noticeably worse > with the number of unique keys. Point queries are used by aggregations while > range queries are used by windowed joins, but of course both are available > within the PAPI and for interactive queries so it's hard to say which we > should prefer. Maybe rather than make that tradeoff we should have one > version for efficient range queries (a "JoinWindowStore") and one for > efficient point queries ("AggWindowStore") - or something. I know we've had > similar thoughts for a different RocksDB store layout for Joins (although I > can't find that ticket anywhere..), it seems like the in-memory stores could > benefit from a special "Join" version as well cc/ Guozhang Wang > Here are some random thoughts: > 1. For kafka streams processing logic (i.e. without IQ), it's better to make > all processing logic relying on point queries rather than range queries. > Right now the only processor that use range queries are, as mentioned above, > windowed stream-stream joins. I think we should consider using a different > window implementation for this (and as a result also get rid of the > retainDuplicate flags) to refactor the windowed stream-stream join operation. > 2. With 1), range queries would only be exposed as IQ. Depending on its usage > frequency I think it makes lots of sense to optimize for single-point queries. > Of course, even without step 1) we should still consider using tree-map for > windowed in-memory stores to have a better scaling effect. -- This message was sent by Atlassian Jira (v8.3.4#803005)