[jira] [Commented] (KAFKA-8817) Flaky Test KafkaProducerTest.testCloseIsForcedOnPendingAddOffsetRequest
[ https://issues.apache.org/jira/browse/KAFKA-8817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924729#comment-16924729 ] Guozhang Wang commented on KAFKA-8817: -- [~cpettitt-confluent] I encountered this issue while working on another PR, so I looked into this and filed a PR. Mind taking a look? Previously I can reproduce it every ~20+ runs or so, now it's more stable. > Flaky Test KafkaProducerTest.testCloseIsForcedOnPendingAddOffsetRequest > --- > > Key: KAFKA-8817 > URL: https://issues.apache.org/jira/browse/KAFKA-8817 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Chris Pettitt >Priority: Major > Labels: flaky-test > > Error: > {code:java} > org.junit.runners.model.TestTimedOutException: test timed out after 5000 > milliseconds > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1260) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1190) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1167) > at > org.apache.kafka.clients.producer.KafkaProducerTest.testCloseIsForcedOnPendingAddOffsetRequest(KafkaProducerTest.java:894) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} > Currently 100% reproducible locally when running the whole test suite. Does > not repro when running this test class individually. This is on the latest > upstream trunk with no changes applied. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8817) Flaky Test KafkaProducerTest.testCloseIsForcedOnPendingAddOffsetRequest
[ https://issues.apache.org/jira/browse/KAFKA-8817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924728#comment-16924728 ] ASF GitHub Bot commented on KAFKA-8817: --- guozhangwang commented on pull request #7313: KAFKA-8817: Remove timeout for the whole test URL: https://github.com/apache/kafka/pull/7313 I bumped into this flaky test while working on another PR. It's a bit different from the reported PR, where it actually timed out at parsing localhost:port already. I think what we should really test is that the closing call can complete, so I removed the whole test timeout and add the timeout for the shutdown latch instead. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky Test KafkaProducerTest.testCloseIsForcedOnPendingAddOffsetRequest > --- > > Key: KAFKA-8817 > URL: https://issues.apache.org/jira/browse/KAFKA-8817 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Chris Pettitt >Priority: Major > Labels: flaky-test > > Error: > {code:java} > org.junit.runners.model.TestTimedOutException: test timed out after 5000 > milliseconds > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1260) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1190) > at > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1167) > at > org.apache.kafka.clients.producer.KafkaProducerTest.testCloseIsForcedOnPendingAddOffsetRequest(KafkaProducerTest.java:894) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:748) > {code} > Currently 100% reproducible locally when running the whole test suite. Does > not repro when running this test class individually. This is on the latest > upstream trunk with no changes applied. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8421) Allow consumer.poll() to return data in the middle of rebalance
[ https://issues.apache.org/jira/browse/KAFKA-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924725#comment-16924725 ] ASF GitHub Bot commented on KAFKA-8421: --- guozhangwang commented on pull request #7312: KAFKA-8421: Still return data during rebalance URL: https://github.com/apache/kafka/pull/7312 1. Not wait until `updateAssignmentMetadataIfNeeded` returns true, but only call it once with 0 timeout. Also do not return empty if in rebalance. 2. Trim the pre-fetched records after long polling since assignment may have been changed. 3. Also need to update SubscriptionState to retain the state in `assignFromSubscribed` if it already exists (similar to `assignFromUser`), so that we do not need the transition of INITIALIZING to FETCHING. 4. Unit test: this actually took me the most time :) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow consumer.poll() to return data in the middle of rebalance > --- > > Key: KAFKA-8421 > URL: https://issues.apache.org/jira/browse/KAFKA-8421 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > With KIP-429 in place, today when a consumer is about to send join-group > request its owned partitions may not be empty, meaning that some of its > fetched data can still be returned. Nevertheless, today the logic is strict: > {code} > if (!updateAssignmentMetadataIfNeeded(timer)) { > return ConsumerRecords.empty(); > } > {code} > I.e. if the consumer enters a rebalance it always returns no data. > As an optimization, we can consider letting consumers to still return > messages that still belong to its owned partitions even when it is within a > rebalance, because we know it is safe that no one else would claim those > partitions in this rebalance yet, and we can still commit offsets if, after > this rebalance, the partitions need to be revoked then. > One thing we need to take care though is the rebalance timeout, i.e. when > consumer's processing those records they may not call the next poll() in time > (think: Kafka Streams num.iterations mechanism), which may leads to consumer > dropping out of the group during rebalance. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8885) The Kafka Protocol should Support Optional Tagged Fields
Colin P. McCabe created KAFKA-8885: -- Summary: The Kafka Protocol should Support Optional Tagged Fields Key: KAFKA-8885 URL: https://issues.apache.org/jira/browse/KAFKA-8885 Project: Kafka Issue Type: New Feature Reporter: Colin P. McCabe Assignee: Colin P. McCabe Implement KIP-482: The Kafka Protocol should Support Optional Tagged Fields See [KIP-482|https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Assigned] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryanne Dolan reassigned KAFKA-7500: --- Assignee: Manikumar > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Manikumar >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8884) Improve streams errors on class cast exception in ProcessorsNodes
[ https://issues.apache.org/jira/browse/KAFKA-8884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8884: --- Affects Version/s: (was: 2.3.1) > Improve streams errors on class cast exception in ProcessorsNodes > - > > Key: KAFKA-8884 > URL: https://issues.apache.org/jira/browse/KAFKA-8884 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Antony Stubbs >Assignee: Antony Stubbs >Priority: Minor > > [https://github.com/apache/kafka/pull/7309] > > If a processor causes a class cast exception, atm you get a bit of a cryptic > error if you're not used to them, and without a context sensitive suggestion > as to what could be wrong. Often these can be cause by missconfigured Serdes > (defaults). > > As an example of the improvement over the case exception: > }}{{org.apache.kafka.streams.errors.StreamsException: Exception caught > in process. taskId=0_0, processor=KSTREAM-SOURCE-00, > topic=streams-plaintext-input, partition=0, offset=0, > stacktrace=org.apache.kafka.streams.errors.StreamsException: > ClassCastException invoking Processor. Do the Processor's input types match > the deserialized types? Check the Serde setup and change the default Serdes > in StreamConfig or provide correct Serdes via method parameters. Deserialized > input types are: key: [B, value: [B > \{{ {{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:123) > \{{ {{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) > \{{ {{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) > \{{ {{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) > \{{ {{ at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) > \{{ {{ at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366) > \{{ {{ at > org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419) > \{{ {{ at > org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176) > \{{ {{ at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > \{{ {{ at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > \{{ {{ at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > \{{ {{ at java.base/java.lang.reflect.Method.invoke(Method.java:566) > \{{ {{ at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > \{{ {{ at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > \{{ {{ at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > \{{ {{ at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > \{{ {{ at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > \{{ {{ at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > \{{ {{ at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > \{{ {{ at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > \{{ {{ at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > \{{ {{ at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > \{{ {{ at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > \{{ {{ at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > \{{ {{ at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > \{{ {{ at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > \{{ {{ at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > \{{ {{ at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > \{{ {{ at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > \{{ {{ at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > \{{ {{ at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > \{{ {{ at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > \{{ {{ at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > \{{ {{ Caused by: java.lang.ClassCastException: class [B cannot be cast
[jira] [Assigned] (KAFKA-8884) Improve streams errors on class cast exception in ProcessorsNodes
[ https://issues.apache.org/jira/browse/KAFKA-8884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-8884: -- Assignee: Antony Stubbs > Improve streams errors on class cast exception in ProcessorsNodes > - > > Key: KAFKA-8884 > URL: https://issues.apache.org/jira/browse/KAFKA-8884 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.1 >Reporter: Antony Stubbs >Assignee: Antony Stubbs >Priority: Minor > > [https://github.com/apache/kafka/pull/7309] > > If a processor causes a class cast exception, atm you get a bit of a cryptic > error if you're not used to them, and without a context sensitive suggestion > as to what could be wrong. Often these can be cause by missconfigured Serdes > (defaults). > > As an example of the improvement over the case exception: > }}{{org.apache.kafka.streams.errors.StreamsException: Exception caught > in process. taskId=0_0, processor=KSTREAM-SOURCE-00, > topic=streams-plaintext-input, partition=0, offset=0, > stacktrace=org.apache.kafka.streams.errors.StreamsException: > ClassCastException invoking Processor. Do the Processor's input types match > the deserialized types? Check the Serde setup and change the default Serdes > in StreamConfig or provide correct Serdes via method parameters. Deserialized > input types are: key: [B, value: [B > \{{ {{ at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:123) > \{{ {{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) > \{{ {{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) > \{{ {{ at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) > \{{ {{ at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) > \{{ {{ at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366) > \{{ {{ at > org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419) > \{{ {{ at > org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176) > \{{ {{ at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > \{{ {{ at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > \{{ {{ at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > \{{ {{ at java.base/java.lang.reflect.Method.invoke(Method.java:566) > \{{ {{ at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > \{{ {{ at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > \{{ {{ at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > \{{ {{ at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > \{{ {{ at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > \{{ {{ at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > \{{ {{ at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > \{{ {{ at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > \{{ {{ at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > \{{ {{ at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > \{{ {{ at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > \{{ {{ at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > \{{ {{ at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > \{{ {{ at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > \{{ {{ at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > \{{ {{ at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > \{{ {{ at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > \{{ {{ at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > \{{ {{ at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > \{{ {{ at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > \{{ {{ at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > \{{ {{ Caused by: java.lang.ClassCastException:
[jira] [Updated] (KAFKA-8884) Improve streams errors on class cast exception in ProcessorsNodes
[ https://issues.apache.org/jira/browse/KAFKA-8884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antony Stubbs updated KAFKA-8884: - Description: [https://github.com/apache/kafka/pull/7309] If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them, and without a context sensitive suggestion as to what could be wrong. Often these can be cause by missconfigured Serdes (defaults). As an example of the improvement over the case exception: }}{{org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=streams-plaintext-input, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Deserialized input types are: key: [B, value: [B \{{ {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:123) \{{ {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) \{{ {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) \{{ {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) \{{ {{ at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) \{{ {{ at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366) \{{ {{ at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419) \{{ {{ at org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176) \{{ {{ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) \{{ {{ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) \{{ {{ at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) \{{ {{ at java.base/java.lang.reflect.Method.invoke(Method.java:566) \{{ {{ at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) \{{ {{ at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) \{{ {{ at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) \{{ {{ at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) \{{ {{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) \{{ {{ at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) \{{ {{ at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) \{{ {{ at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) \{{ {{ at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) \{{ {{ at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) \{{ {{ at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) \{{ {{ at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) \{{ {{ at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) \{{ {{ at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) \{{ {{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) \{{ {{ at org.junit.runners.ParentRunner.run(ParentRunner.java:412) \{{ {{ at org.junit.runner.JUnitCore.run(JUnitCore.java:137) \{{ {{ at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) \{{ {{ at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) \{{ {{ at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) \{{ {{ at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) \{{ {{ Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap') \{{ {{ at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103) \{{ {{ at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40) \{{ {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:119) {{ { { ... 32 more} }}} was: If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them,
[jira] [Commented] (KAFKA-8660) Make ValueToKey SMT work only on a whitelist of topics
[ https://issues.apache.org/jira/browse/KAFKA-8660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924269#comment-16924269 ] Marc Löhe commented on KAFKA-8660: -- I added a proof of concept implementation of a topic whitelist configuration option for all SMTs here : [https://github.com/apache/kafka/pull/7308] It would be amazing to get some feedback on how we can find a solution for the original problem. > Make ValueToKey SMT work only on a whitelist of topics > -- > > Key: KAFKA-8660 > URL: https://issues.apache.org/jira/browse/KAFKA-8660 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Marc Löhe >Priority: Minor > Labels: needs-kip > > For source connectors that publish on multiple topics it is essential to be > able to configure transforms to be active only for certain topics. I'll add a > PR to implement this on the example of the ValueToKey SMT. > I'm also interested in opionions if this would make sense to add as a > configurable option to all packaged SMTs or even as a capability for SMTs in > general. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8660) Make ValueToKey SMT work only on a whitelist of topics
[ https://issues.apache.org/jira/browse/KAFKA-8660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marc Löhe updated KAFKA-8660: - Description: For source connectors that publish on multiple topics (e.g. [Debezium|[https://debezium.io/]]) it is essential to be able to configure transforms to be active only for certain topics. I'll add a PR to implement this on the example of the ValueToKey SMT. I'm also interested in opionions if this would make sense to add as a configurable option to all packaged SMTs or even as a capability for SMTs in general. was: For source connectors that publish on multiple topics it is essential to be able to configure transforms to be active only for certain topics. I'll add a PR to implement this on the example of the ValueToKey SMT. I'm also interested in opionions if this would make sense to add as a configurable option to all packaged SMTs or even as a capability for SMTs in general. > Make ValueToKey SMT work only on a whitelist of topics > -- > > Key: KAFKA-8660 > URL: https://issues.apache.org/jira/browse/KAFKA-8660 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Marc Löhe >Priority: Minor > Labels: needs-kip > > For source connectors that publish on multiple topics (e.g. > [Debezium|[https://debezium.io/]]) it is essential to be able to configure > transforms to be active only for certain topics. I'll add a PR to implement > this on the example of the ValueToKey SMT. > I'm also interested in opionions if this would make sense to add as a > configurable option to all packaged SMTs or even as a capability for SMTs in > general. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8884) Improve streams errors on class cast exception in ProcessorsNodes
[ https://issues.apache.org/jira/browse/KAFKA-8884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924267#comment-16924267 ] ASF GitHub Bot commented on KAFKA-8884: --- astubbs commented on pull request #7309: MINOR: KAFKA-8884: class cast exception improvement URL: https://github.com/apache/kafka/pull/7309 If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them, and without a context sensitive suggestion as to what could be wrong. Often these can be cause by miss-configured Serdes (defaults). Old error: ``` Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap') ``` An example of the improvement over the case exception: ``` org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=streams-plaintext-input, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Deserialized input types are: key: [B, value: [B at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:123) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366) at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419) at org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.ParentRunner.run(ParentRunner.java:412) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap') at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103) at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40) at
[jira] [Updated] (KAFKA-8884) Improve streams errors on class cast exception in ProcessorsNodes
[ https://issues.apache.org/jira/browse/KAFKA-8884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antony Stubbs updated KAFKA-8884: - Description: If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them, and without a context sensitive suggestion as to what could be wrong. Often these can be cause by missconfigured Serdes (defaults). As an example of the improvement over the case exception: }}{{org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=streams-plaintext-input, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Deserialized input types are: key: [B, value: [B {{ \{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:123) {{ \{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) {{ \{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) {{ \{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) {{ \{{ at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) {{ \{{ at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366) {{ \{{ at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419) {{ \{{ at org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176) {{ \{{ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) {{ \{{ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) {{ \{{ at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) {{ \{{ at java.base/java.lang.reflect.Method.invoke(Method.java:566) {{ \{{ at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) {{ \{{ at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) {{ \{{ at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) {{ \{{ at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) {{ \{{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) {{ \{{ at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) {{ \{{ at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) {{ \{{ at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) {{ \{{ at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) {{ \{{ at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) {{ \{{ at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) {{ \{{ at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) {{ \{{ at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) {{ \{{ at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) {{ \{{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) {{ \{{ at org.junit.runners.ParentRunner.run(ParentRunner.java:412) {{ \{{ at org.junit.runner.JUnitCore.run(JUnitCore.java:137) {{ \{{ at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) {{ \{{ at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) {{ \{{ at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) {{ \{{ at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) {{ \{{ Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap') {{ \{{ at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103) {{ \{{ at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40) {{ \{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:119) {{ \{{ ... 32 more was: If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them, and without a context sensitive suggestion as to what could be wrong. Often these can be
[jira] [Updated] (KAFKA-8884) Improve streams errors on class cast exception in ProcessorsNodes
[ https://issues.apache.org/jira/browse/KAFKA-8884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antony Stubbs updated KAFKA-8884: - Description: If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them, and without a context sensitive suggestion as to what could be wrong. Often these can be cause by missconfigured Serdes (defaults). As an example of the improvement over the case exception: {{ }}{{org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=streams-plaintext-input, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Deserialized input types are: key: [B, value: [B}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:123)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)}} {{ at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)}} {{ at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)}} {{ at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176)}} {{ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)}} {{ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)}} {{ at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}} {{ at java.base/java.lang.reflect.Method.invoke(Method.java:566)}} {{ at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)}} {{ at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)}} {{ at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)}} {{ at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)}} {{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}} {{ at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)}} {{ at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)}} {{ at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)}} {{ at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)}} {{ at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)}} {{ at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)}} {{ at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)}} {{ at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)}} {{ at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)}} {{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}} {{ at org.junit.runners.ParentRunner.run(ParentRunner.java:412)}} {{ at org.junit.runner.JUnitCore.run(JUnitCore.java:137)}} {{ at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)}} {{ at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)}} {{ at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)}} {{ at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)}} {{ Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')}} {{ at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)}} {{ at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:119)}} {{ ... 32 more}} was: If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them, and without a context sensitive suggestion as to what could be wrong. Often these can be cause by missconfigured Serdes (defaults). As an example of the improvement over the case exception: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0,
[jira] [Updated] (KAFKA-8884) Improve streams errors on class cast exception in ProcessorsNodes
[ https://issues.apache.org/jira/browse/KAFKA-8884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antony Stubbs updated KAFKA-8884: - Description: If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them, and without a context sensitive suggestion as to what could be wrong. Often these can be cause by missconfigured Serdes (defaults). As an example of the improvement over the case exception: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=streams-plaintext-input, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Deserialized input types are: key: [B, value: [B at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:123) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366) at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419) at org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.ParentRunner.run(ParentRunner.java:412) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap') at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103) at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:119) ... 32 more was: If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them, and without a context sensitive suggestion as to what could be wrong. Often these can be cause by missconfigured Serdes (defaults). As an example of the improvement over the case exception: {{org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=streams-plaintext-input, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: A deserializer (key:
[jira] [Commented] (KAFKA-8660) Make ValueToKey SMT work only on a whitelist of topics
[ https://issues.apache.org/jira/browse/KAFKA-8660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924264#comment-16924264 ] ASF GitHub Bot commented on KAFKA-8660: --- bfncs commented on pull request #7308: KAFKA-8660: Add connect SMT topic whitelist config URL: https://github.com/apache/kafka/pull/7308 For source connectors that publish on multiple topics it is essential to be able to configure transforms to be active only for certain topics. This adds the possibility to define a whitelist of topics each single message transform (SMT) is applied to by adding a comma-separated list of these topics as key `topics` to the transforms configuration. It solves KAFKA-8660 not only for a single but for all SMTs and is an alternative to #7084. Since this introduces a change breaking backwards compatibility (because anyone could have used `topics` as a configuration key already), this PR is merely a request for comments: would this be something the maintainers of this project would be interested in adding? If yes, a strategy for smooth migration would be needed, and I would add the needed integration tests and documentation. ## Example configuration: ``` transforms: "ValueToKey" transforms.ValueToKey.type: "org.apache.kafka.connect.transforms.ValueToKey" transforms.ValueToKey.fields: "userId,city,state" transforms.ValueToKey.topics: "my-addresses" ``` This makes sure the transform `ValueToKey` is only applied for records published on the `my-addresses` topic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make ValueToKey SMT work only on a whitelist of topics > -- > > Key: KAFKA-8660 > URL: https://issues.apache.org/jira/browse/KAFKA-8660 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Marc Löhe >Priority: Minor > Labels: needs-kip > > For source connectors that publish on multiple topics it is essential to be > able to configure transforms to be active only for certain topics. I'll add a > PR to implement this on the example of the ValueToKey SMT. > I'm also interested in opionions if this would make sense to add as a > configurable option to all packaged SMTs or even as a capability for SMTs in > general. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8884) Improve streams errors on class cast exception in ProcessorsNodes
Antony Stubbs created KAFKA-8884: Summary: Improve streams errors on class cast exception in ProcessorsNodes Key: KAFKA-8884 URL: https://issues.apache.org/jira/browse/KAFKA-8884 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.3.1 Reporter: Antony Stubbs If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them, and without a context sensitive suggestion as to what could be wrong. Often these can be cause by missconfigured Serdes (defaults). As an example of the improvement over the case exception: {{org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=streams-plaintext-input, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: A deserializer (key: org.apache.kafka.common.serialization.ByteArrayDeserializer / value: org.apache.kafka.common.serialization.ByteArrayDeserializer) is not compatible to the actual key or value type (key type: [B / value type: [B). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.}} {{ at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)}} {{ at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)}} {{ at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176)}} {{ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)}} {{ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)}} {{ at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}} {{ at java.base/java.lang.reflect.Method.invoke(Method.java:566)}} {{ at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)}} {{ at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)}} {{ at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)}} {{ at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)}} {{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}} {{ at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)}} {{ at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)}} {{ at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)}} {{ at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)}} {{ at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)}} {{ at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)}} {{ at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)}} {{ at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)}} {{ at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)}} {{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}} {{ at org.junit.runners.ParentRunner.run(ParentRunner.java:412)}} {{ at org.junit.runner.JUnitCore.run(JUnitCore.java:137)}} {{ at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)}} {{ at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)}} {{ at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)}} {{ at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)}} {{Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')}} {{ at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)}} {{ at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)}} {{ at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:89)}} {{ ... 28 more}} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8883) Kafka Consumer continuously echoing INFO messages Seeking to offset 0 for partition
[ https://issues.apache.org/jira/browse/KAFKA-8883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Veerabhadra Rao Mallavarapu updated KAFKA-8883: --- Summary: Kafka Consumer continuously echoing INFO messages Seeking to offset 0 for partition (was: Kafka Consumer continuously displaying info message Seeking to offset 0 for partition) > Kafka Consumer continuously echoing INFO messages Seeking to offset 0 for > partition > --- > > Key: KAFKA-8883 > URL: https://issues.apache.org/jira/browse/KAFKA-8883 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Veerabhadra Rao Mallavarapu >Priority: Major > Fix For: 2.3.0 > > Attachments: kafka_230.txt > > > KafkaConsumer continously echoing INFO messages which is leading to growing > my log file size to very huge within minutes. Please do the needful asap. > Earlier it was a debug message, in latest version 2.3.0, It is changed to > INFO. > Please refer KafkaConsumer.java line 1545 > > 09-06@06:27:38 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-3, groupId=reporting-agent - ProgramLikeCountSplitter - 0] > Seeking to offset 0 for partition ProgramLikeCountSplitter-0 > 09-06@06:27:38 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-13, groupId=reporting-agent - ProgramCommented - 0] Seeking > to offset 0 for partition ProgramCommented-0 > 09-06@06:27:38 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-2, groupId=reporting-agent - ProgramCommentCountSplitter - > 0] Seeking to offset 0 for partition ProgramCommentCountSplitter-0 > 09-06@06:27:38 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-7, groupId=reporting-agent - MigrateUserView - 0] Seeking > to offset 0 for partition MigrateUserView-0 > 09-06@06:27:38 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-11, groupId=reporting-agent - MigrateBroadcastSummary - 0] > Seeking to offset 0 for partition MigrateBroadcastSummary-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-9, groupId=reporting-agent - ElogPlaybackRecord - 0] > Seeking to offset 0 for partition ElogPlaybackRecord-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-17, groupId=reporting-agent - PortalSearchReport - 0] > Seeking to offset 10 for partition PortalSearchReport-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-10, groupId=reporting-agent - ProgramLikeCount - 0] Seeking > to offset 0 for partition ProgramLikeCount-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-19, groupId=reporting-agent - UserLoginReport - 0] Seeking > to offset 4 for partition UserLoginReport-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-14, groupId=reporting-agent - ChannelChangeReport - 0] > Seeking to offset 0 for partition ChannelChangeReport-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-20, groupId=reporting-agent - ProgramViewCount - 0] Seeking > to offset 0 for partition ProgramViewCount-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-5, groupId=reporting-agent - BroadcastSummary - 0] Seeking > to offset 0 for partition BroadcastSummary-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-4, groupId=reporting-agent - UserView - 0] Seeking to > offset 0 for partition UserView-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-1, groupId=reporting-agent - ProgramViewed - 0] Seeking to > offset 0 for partition ProgramViewed-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-18, groupId=reporting-agent - ProgramLiked - 0] Seeking to > offset 0 for partition ProgramLiked-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-21, groupId=reporting-agent - ProgramChangeReport - 0] > Seeking to offset 0 for partition ProgramChangeReport-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-16, groupId=reporting-agent - ProgramCommentCount - 0] > Seeking to offset 0 for partition ProgramCommentCount-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-8, groupId=reporting-agent - ChannelViewReport - 0] Seeking > to offset 0 for partition ChannelViewReport-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer > clientId=consumer-15, groupId=reporting-agent - ProgramFavoriteCountSplitter > - 0] Seeking to offset 0 for partition ProgramFavoriteCountSplitter-0 > 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer >
[jira] [Created] (KAFKA-8883) Kafka Consumer continuously displaying info message Seeking to offset 0 for partition
Veerabhadra Rao Mallavarapu created KAFKA-8883: -- Summary: Kafka Consumer continuously displaying info message Seeking to offset 0 for partition Key: KAFKA-8883 URL: https://issues.apache.org/jira/browse/KAFKA-8883 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.3.0 Reporter: Veerabhadra Rao Mallavarapu Fix For: 2.3.0 Attachments: kafka_230.txt KafkaConsumer continously echoing INFO messages which is leading to growing my log file size to very huge within minutes. Please do the needful asap. Earlier it was a debug message, in latest version 2.3.0, It is changed to INFO. Please refer KafkaConsumer.java line 1545 09-06@06:27:38 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-3, groupId=reporting-agent - ProgramLikeCountSplitter - 0] Seeking to offset 0 for partition ProgramLikeCountSplitter-0 09-06@06:27:38 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-13, groupId=reporting-agent - ProgramCommented - 0] Seeking to offset 0 for partition ProgramCommented-0 09-06@06:27:38 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-2, groupId=reporting-agent - ProgramCommentCountSplitter - 0] Seeking to offset 0 for partition ProgramCommentCountSplitter-0 09-06@06:27:38 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-7, groupId=reporting-agent - MigrateUserView - 0] Seeking to offset 0 for partition MigrateUserView-0 09-06@06:27:38 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-11, groupId=reporting-agent - MigrateBroadcastSummary - 0] Seeking to offset 0 for partition MigrateBroadcastSummary-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-9, groupId=reporting-agent - ElogPlaybackRecord - 0] Seeking to offset 0 for partition ElogPlaybackRecord-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-17, groupId=reporting-agent - PortalSearchReport - 0] Seeking to offset 10 for partition PortalSearchReport-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-10, groupId=reporting-agent - ProgramLikeCount - 0] Seeking to offset 0 for partition ProgramLikeCount-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-19, groupId=reporting-agent - UserLoginReport - 0] Seeking to offset 4 for partition UserLoginReport-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-14, groupId=reporting-agent - ChannelChangeReport - 0] Seeking to offset 0 for partition ChannelChangeReport-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-20, groupId=reporting-agent - ProgramViewCount - 0] Seeking to offset 0 for partition ProgramViewCount-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-5, groupId=reporting-agent - BroadcastSummary - 0] Seeking to offset 0 for partition BroadcastSummary-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-4, groupId=reporting-agent - UserView - 0] Seeking to offset 0 for partition UserView-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-1, groupId=reporting-agent - ProgramViewed - 0] Seeking to offset 0 for partition ProgramViewed-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-18, groupId=reporting-agent - ProgramLiked - 0] Seeking to offset 0 for partition ProgramLiked-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-21, groupId=reporting-agent - ProgramChangeReport - 0] Seeking to offset 0 for partition ProgramChangeReport-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-16, groupId=reporting-agent - ProgramCommentCount - 0] Seeking to offset 0 for partition ProgramCommentCount-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-8, groupId=reporting-agent - ChannelViewReport - 0] Seeking to offset 0 for partition ChannelViewReport-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-15, groupId=reporting-agent - ProgramFavoriteCountSplitter - 0] Seeking to offset 0 for partition ProgramFavoriteCountSplitter-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-6, groupId=reporting-agent - ProgramViewingSplitter - 0] Seeking to offset 0 for partition ProgramViewingSplitter-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-12, groupId=reporting-agent - DownloadReport - 0] Seeking to offset 0 for partition DownloadReport-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer clientId=consumer-3, groupId=reporting-agent - ProgramLikeCountSplitter - 0] Seeking to offset 0 for partition ProgramLikeCountSplitter-0 09-06@06:27:39 INFO (KafkaConsumer.java:1545) - [Consumer
[jira] [Comment Edited] (KAFKA-6745) kafka consumer rebalancing takes long time (from 3 secs to 5 minutes)
[ https://issues.apache.org/jira/browse/KAFKA-6745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16922375#comment-16922375 ] Xin edited comment on KAFKA-6745 at 9/6/19 11:30 AM: - same problem,kafka version:1.1.0,spark streaming application one job has 2 consumers Received successful join group response after like session.timout.ms was (Author: auroraxlh): same problem,kafka version:1.1.0,spark streaming application Received successful join group response after 5min > kafka consumer rebalancing takes long time (from 3 secs to 5 minutes) > - > > Key: KAFKA-6745 > URL: https://issues.apache.org/jira/browse/KAFKA-6745 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 0.11.0.0 >Reporter: Ramkumar >Priority: Major > > Hi, We had an HTTP service 3 nodes around Kafka 0.8 . This http service acts > as a REST api for the publishers and consumers to use middleware intead of > using kafka client api. Here the when the consumers rebalance is not a major > issue. > We wanted to upgrade to kafka 0.11 , we have updated our http services (3 > node cluster) to use new Kafka consumer API , but it takes rebalancing of > consumer (multiple consumer under same Group) between secs to 5 mins > (max.poll.interval.ms). Because of this time our http clients are timing out > and do failover. This rebalancing time is major issue. It is not clear from > the documentation ,that rebalance activity for the group takes place after > max.poll.interval.ms or it starts after 3 secs and complete any time with in > 5 minutes. We tried to reduce max.poll.interval.ms to 15 seconds. but this > also triggers rebalance internally. > Below are the other parameters we have set In our service > max.poll.interval.ms = 30 sec > seconds heartbeat.interval.ms = 1 > minute session.timeout.ms = 4 > minutes consumer.cache.timeout = 2 min > > > below is the log > ""2018-03-26 12:53:23,009 [qtp1404928347-11556] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > (Re-)joining group firstnetportal_001 > ""2018-03-26 12:57:52,793 [qtp1404928347-11556] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > Successfully joined group firstnetportal_001 with generation 7475 > Please let me know if there are any other application/client use http > interace in 3 nodes with out any having this issue > > > > > > > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924152#comment-16924152 ] Valentin Florea edited comment on KAFKA-7447 at 9/6/19 11:13 AM: - We're running in production Kafka 2.0.0 and just ran this morning on the same issue. We had to restart the cluster in order to add a new user (not a great mechanism of adding users also). Really serious problem and costed us 4 hours of fixing to bring all systems up to date from a data-consistency standpoint. Any ETA on a fix for this? was (Author: practice): We're running in production Kafka 2.0.0 and just ran this morning on the same issue. Really serious problem and costed us 4 hours of fixing to bring all systems up to date from a data-consistency standpoint. Any ETA on a fix for this? > Consumer offsets lost during leadership rebalance after bringing node back > from clean shutdown > -- > > Key: KAFKA-7447 > URL: https://issues.apache.org/jira/browse/KAFKA-7447 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ben Isaacs >Priority: Major > > *Summary:* > * When 1 of my 3 brokers is cleanly shut down, consumption and production > continues as normal due to replication. (Consumers are rebalanced to the > replicas, and producers are rebalanced to the remaining brokers). However, > when the cleanly-shut-down broker comes back, after about 10 minutes, a > flurry of production errors occur and my consumers suddenly go back in time 2 > weeks, causing a long outage (12 hours+) as all messages are replayed on some > topics. > * The hypothesis is that the auto-leadership-rebalance is happening too > quickly after the downed broker returns, before it has had a chance to become > fully synchronised on all partitions. In particular, it seems that having > consumer offets ahead of the most recent data on the topic that consumer was > following causes the consumer to be reset to 0. > *Expected:* > * bringing a node back from a clean shut down does not cause any consumers > to reset to 0. > *Actual:* > * I experience approximately 12 hours of partial outage triggered at the > point that auto leadership rebalance occurs, after a cleanly shut down node > returns. > *Workaround:* > * disable auto leadership rebalance entirely. > * manually rebalance it from time to time when all nodes and all partitions > are fully replicated. > *My Setup:* > * Kafka deployment with 3 brokers and 2 topics. > * Replication factor is 3, for all topics. > * min.isr is 2, for all topics. > * Zookeeper deployment with 3 instances. > * In the region of 10 to 15 consumers, with 2 user topics (and, of course, > the system topics such as consumer offsets). Consumer offsets has the > standard 50 partitions. The user topics have about 3000 partitions in total. > * Offset retention time of 7 days, and topic retention time of 14 days. > * Input rate ~1000 messages/sec. > * Deployment happens to be on Google compute engine. > *Related Stack Overflow Post:* > https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker > It was suggested I open a ticket by "Muir" who says he they have also > experienced this. > *Transcription of logs, showing the problem:* > Below, you can see chronologically sorted, interleaved, logs from the 3 > brokers. prod-kafka-2 is the node which was cleanly shut down and then > restarted. I filtered the messages only to those regardling > __consumer_offsets-29 because it's just too much to paste, otherwise. > ||Broker host||Broker ID|| > |prod-kafka-1|0| > |prod-kafka-2|1 (this one was restarted)| > |prod-kafka-3|2| > prod-kafka-2: (just starting up) > {code} > [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: (sees replica1 come back) > {code} > [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] > Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition) > {code} > prod-kafka-2: > {code} > [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed >
[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924152#comment-16924152 ] Valentin Florea commented on KAFKA-7447: We're running in production Kafka 2.0.0 and just ran this morning on the same issue. Really serious problem and costed us 4 hours of fixing to bring all systems up to date from a data-consistency standpoint. Any ETA on a fix for this? > Consumer offsets lost during leadership rebalance after bringing node back > from clean shutdown > -- > > Key: KAFKA-7447 > URL: https://issues.apache.org/jira/browse/KAFKA-7447 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ben Isaacs >Priority: Major > > *Summary:* > * When 1 of my 3 brokers is cleanly shut down, consumption and production > continues as normal due to replication. (Consumers are rebalanced to the > replicas, and producers are rebalanced to the remaining brokers). However, > when the cleanly-shut-down broker comes back, after about 10 minutes, a > flurry of production errors occur and my consumers suddenly go back in time 2 > weeks, causing a long outage (12 hours+) as all messages are replayed on some > topics. > * The hypothesis is that the auto-leadership-rebalance is happening too > quickly after the downed broker returns, before it has had a chance to become > fully synchronised on all partitions. In particular, it seems that having > consumer offets ahead of the most recent data on the topic that consumer was > following causes the consumer to be reset to 0. > *Expected:* > * bringing a node back from a clean shut down does not cause any consumers > to reset to 0. > *Actual:* > * I experience approximately 12 hours of partial outage triggered at the > point that auto leadership rebalance occurs, after a cleanly shut down node > returns. > *Workaround:* > * disable auto leadership rebalance entirely. > * manually rebalance it from time to time when all nodes and all partitions > are fully replicated. > *My Setup:* > * Kafka deployment with 3 brokers and 2 topics. > * Replication factor is 3, for all topics. > * min.isr is 2, for all topics. > * Zookeeper deployment with 3 instances. > * In the region of 10 to 15 consumers, with 2 user topics (and, of course, > the system topics such as consumer offsets). Consumer offsets has the > standard 50 partitions. The user topics have about 3000 partitions in total. > * Offset retention time of 7 days, and topic retention time of 14 days. > * Input rate ~1000 messages/sec. > * Deployment happens to be on Google compute engine. > *Related Stack Overflow Post:* > https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker > It was suggested I open a ticket by "Muir" who says he they have also > experienced this. > *Transcription of logs, showing the problem:* > Below, you can see chronologically sorted, interleaved, logs from the 3 > brokers. prod-kafka-2 is the node which was cleanly shut down and then > restarted. I filtered the messages only to those regardling > __consumer_offsets-29 because it's just too much to paste, otherwise. > ||Broker host||Broker ID|| > |prod-kafka-1|0| > |prod-kafka-2|1 (this one was restarted)| > |prod-kafka-3|2| > prod-kafka-2: (just starting up) > {code} > [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: (sees replica1 come back) > {code} > [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] > Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition) > {code} > prod-kafka-2: > {code} > [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] > __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous > Leader Epoch was: 77 (kafka.cluster.Partition) > [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling > loading of offsets and group metadata from
[jira] [Commented] (KAFKA-8660) Make ValueToKey SMT work only on a whitelist of topics
[ https://issues.apache.org/jira/browse/KAFKA-8660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924150#comment-16924150 ] Marc Löhe commented on KAFKA-8660: -- There hasn't been any progress on this one lately. What can I do to find a solution to the underlying problem? If it helps I could try to write a KIP that allows configuring a whitelist of topics for any SMT. I could also just add a PR that implements it. It would be great to get some feedback from any of the maintainers here. > Make ValueToKey SMT work only on a whitelist of topics > -- > > Key: KAFKA-8660 > URL: https://issues.apache.org/jira/browse/KAFKA-8660 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Marc Löhe >Priority: Minor > Labels: needs-kip > > For source connectors that publish on multiple topics it is essential to be > able to configure transforms to be active only for certain topics. I'll add a > PR to implement this on the example of the ValueToKey SMT. > I'm also interested in opionions if this would make sense to add as a > configurable option to all packaged SMTs or even as a capability for SMTs in > general. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8882) It's not possible to restart Kafka Streams using StateListener
Jakub created KAFKA-8882: Summary: It's not possible to restart Kafka Streams using StateListener Key: KAFKA-8882 URL: https://issues.apache.org/jira/browse/KAFKA-8882 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.2.1 Environment: Linux, Windows Reporter: Jakub Upon problems with connecting to a Kafka Cluster services using Kafka Streams stop working with the following error message: {code:java} Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors (...) Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout (...) State transition from PENDING_SHUTDOWN to DEAD (...) All stream threads have died. The instance will be in error state and should be closed. {code} We tried to use a StateListener to automatically detect and work around this problem. However, this approach doesn't seem to work correctly: # KafkaStreams correctly transitions from status Error to Pending Shutdown, but then it stays in this status forever. # Occasionally, after registering a listener the status doesn't even change to Error. {code:java} kafkaStreams.setStateListener(new StateListener() { public void onChange(State stateNew, State stateOld) { if (stateNew == State.ERROR) { kafkaStreams.cleanUp(); kafkaStreams.close(); } else if (stateNew == State.PENDING_SHUTDOWN) { // this message is displayed, and then nothig else happens LOGGER.info("State is PENDING_SHUTDOWN"); } else if (stateNew == State.NOT_RUNNING) { // it never gets here kafkaStreams = createKafkaStreams(); kafkaStreams.start(); } } }); {code} Surprisingly, restarting KafkaStreams outside of a listener works fine. I'm happy to provide more details if required. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8818) CreatePartitions Request protocol documentation
[ https://issues.apache.org/jira/browse/KAFKA-8818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16924019#comment-16924019 ] Fábio Silva commented on KAFKA-8818: The syntax on documentation is wrong. To represent an array, the correct syntax is using brackets [], not ARRAY(). “In protocol documentation an array of T instances is referred to as [T].”, says the protocol definition page. > CreatePartitions Request protocol documentation > --- > > Key: KAFKA-8818 > URL: https://issues.apache.org/jira/browse/KAFKA-8818 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Fábio Silva >Priority: Major > Labels: documentation, protocol-documentation > > CreatePartitions Request protocol documentation contains a invalid type > ARRAY(INT32) (assignment field), it must be INT32. > Wrong: > {code:java} > assignment => ARRAY(INT32){code} > Correct: > {code:java} > assignment => INT32 > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8881) Measure thread running time precisely
[ https://issues.apache.org/jira/browse/KAFKA-8881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx updated KAFKA-8881: -- Description: Currently, the code uses `System.currentTimeMillis()` to measure timeout extensively. However, many situations trigger the thread suspend such as gc and context switch. In such cases, the timeout value we specify is not strictly honored. I believe many of flaky tests failed with timed-out are a result of this. Maybe we should use ThreadMXBean#getCurrentThreadUserTime to precisely measure the thread running time. (was: Currently, the code uses `System.currentTimeMillis()` to measure timeout extensively. However, many situations trigger the thread suspend such as gc and context switch. In such cases, the timeout value we specify is not strictly honored. Maybe we could use ThreadMXBean#getCurrentThreadUserTime to precisely measure the thread running time.) > Measure thread running time precisely > - > > Key: KAFKA-8881 > URL: https://issues.apache.org/jira/browse/KAFKA-8881 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.4.0 >Reporter: huxihx >Priority: Major > Labels: needs-discussion > > Currently, the code uses `System.currentTimeMillis()` to measure timeout > extensively. However, many situations trigger the thread suspend such as gc > and context switch. In such cases, the timeout value we specify is not > strictly honored. I believe many of flaky tests failed with timed-out are a > result of this. Maybe we should use ThreadMXBean#getCurrentThreadUserTime to > precisely measure the thread running time. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8881) Measure thread running time precisely
[ https://issues.apache.org/jira/browse/KAFKA-8881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx updated KAFKA-8881: -- Description: Currently, the code uses `System.currentTimeMillis()` to measure timeout extensively. However, many situations trigger the thread suspend such as gc and context switch. In such cases, the timeout value we specify is not strictly honored. Maybe we could use ThreadMXBean#getCurrentThreadUserTime to precisely measure the thread running time. (was: Currently, the code uses `System.currentTimeMillis()` to measure timeout. However, many situations trigger the thread suspend such as gc and context switch. In such cases, the timeout value we specify is not strictly honored. Maybe we could use ThreadMXBean#getCurrentThreadUserTime to precisely measure the thread running time.) > Measure thread running time precisely > - > > Key: KAFKA-8881 > URL: https://issues.apache.org/jira/browse/KAFKA-8881 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.4.0 >Reporter: huxihx >Priority: Major > Labels: needs-discussion > > Currently, the code uses `System.currentTimeMillis()` to measure timeout > extensively. However, many situations trigger the thread suspend such as gc > and context switch. In such cases, the timeout value we specify is not > strictly honored. Maybe we could use ThreadMXBean#getCurrentThreadUserTime to > precisely measure the thread running time. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8881) Measure thread running time precisely
[ https://issues.apache.org/jira/browse/KAFKA-8881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx updated KAFKA-8881: -- Labels: needs-discussion (was: ) > Measure thread running time precisely > - > > Key: KAFKA-8881 > URL: https://issues.apache.org/jira/browse/KAFKA-8881 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.4.0 >Reporter: huxihx >Priority: Major > Labels: needs-discussion > > Currently, the code uses `System.currentTimeMillis()` to measure timeout. > However, many situations trigger the thread suspend such as gc and context > switch. In such cases, the timeout value we specify is not strictly honored. > Maybe we could use ThreadMXBean#getCurrentThreadUserTime to precisely measure > the thread running time. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8881) Measure thread running time precisely
huxihx created KAFKA-8881: - Summary: Measure thread running time precisely Key: KAFKA-8881 URL: https://issues.apache.org/jira/browse/KAFKA-8881 Project: Kafka Issue Type: Improvement Affects Versions: 2.4.0 Reporter: huxihx Currently, the code uses `System.currentTimeMillis()` to measure timeout. However, many situations trigger the thread suspend such as gc and context switch. In such cases, the timeout value we specify is not strictly honored. Maybe we could use ThreadMXBean#getCurrentThreadUserTime to precisely measure the thread running time. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Resolved] (KAFKA-8590) Replace TxnOffsetCommitRequest request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8590. Resolution: Fixed > Replace TxnOffsetCommitRequest request/response with automated protocol > --- > > Key: KAFKA-8590 > URL: https://issues.apache.org/jira/browse/KAFKA-8590 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8590) Replace TxnOffsetCommitRequest request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923951#comment-16923951 ] ASF GitHub Bot commented on KAFKA-8590: --- hachikuji commented on pull request #6994: KAFKA-8590: use automated protocol for txn commit and add test coverage for offset commit URL: https://github.com/apache/kafka/pull/6994 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Replace TxnOffsetCommitRequest request/response with automated protocol > --- > > Key: KAFKA-8590 > URL: https://issues.apache.org/jira/browse/KAFKA-8590 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Resolved] (KAFKA-8822) Remove CompletedFetch type from Fetcher
[ https://issues.apache.org/jira/browse/KAFKA-8822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8822. Resolution: Fixed > Remove CompletedFetch type from Fetcher > --- > > Key: KAFKA-8822 > URL: https://issues.apache.org/jira/browse/KAFKA-8822 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Sean Glover >Assignee: Sean Glover >Priority: Minor > > In KAFKA-7548 we re-factored the {{Fetcher}} to create an instance of > {{PartitionRecords}} immediately in the fetch response handler of > {{Fetcher.sendFetches}}. The instance variable {{Fetcher.completedFetches}} > had its type changed to {{ConcurrentLinkedQueue}} and > therefore the need keep completed fetch partition data in a superfluous type > ({{CompletedFetch}}) is no longer required. -- This message was sent by Atlassian Jira (v8.3.2#803003)