[jira] [Resolved] (KAFKA-8392) Kafka broker leaks metric when partition leader moves to another node.
[ https://issues.apache.org/jira/browse/KAFKA-8392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-8392. -- Resolution: Fixed Fix Version/s: 2.4.0 > Kafka broker leaks metric when partition leader moves to another node. > -- > > Key: KAFKA-8392 > URL: https://issues.apache.org/jira/browse/KAFKA-8392 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 2.2.0 >Reporter: Kamal Chandraprakash >Assignee: Tu Tran >Priority: Major > Fix For: 2.4.0 > > > When a partition leader moves from one node to another due to an imbalance in > leader.imbalance.per.broker.percentage, the old leader broker still emits the > static metric value. > Steps to reproduce: > 1. Create a cluster with 3 nodes. > 2. Create a topic with 2 partitions and RF=3 > 3. Generate some data using the console producer. > 4. Move any one of the partition from one node to another using > reassign-partitions and preferred-replica-election script. > 5. Generate some data using the console producer. > 6. Now all the 3 nodes emit bytesIn, bytesOut and MessagesIn for that topic. > Is it the expected behavior? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8392) Kafka broker leaks metric when partition leader moves to another node.
[ https://issues.apache.org/jira/browse/KAFKA-8392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889211#comment-16889211 ] ASF GitHub Bot commented on KAFKA-8392: --- guozhangwang commented on pull request #6977: KAFKA-8392: Fix old metrics leakage by brokers that have no leadership over any partition for a topic URL: https://github.com/apache/kafka/pull/6977 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka broker leaks metric when partition leader moves to another node. > -- > > Key: KAFKA-8392 > URL: https://issues.apache.org/jira/browse/KAFKA-8392 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 2.2.0 >Reporter: Kamal Chandraprakash >Assignee: Tu Tran >Priority: Major > > When a partition leader moves from one node to another due to an imbalance in > leader.imbalance.per.broker.percentage, the old leader broker still emits the > static metric value. > Steps to reproduce: > 1. Create a cluster with 3 nodes. > 2. Create a topic with 2 partitions and RF=3 > 3. Generate some data using the console producer. > 4. Move any one of the partition from one node to another using > reassign-partitions and preferred-replica-election script. > 5. Generate some data using the console producer. > 6. Now all the 3 nodes emit bytesIn, bytesOut and MessagesIn for that topic. > Is it the expected behavior? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8666) Improve Documentation on usage of Materialized config object
[ https://issues.apache.org/jira/browse/KAFKA-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889191#comment-16889191 ] Matthias J. Sax commented on KAFKA-8666: Seems some people think differently. Compare the linked Jira tickets. Feel free to comment on those, too. > Improve Documentation on usage of Materialized config object > > > Key: KAFKA-8666 > URL: https://issues.apache.org/jira/browse/KAFKA-8666 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Bill Bejeck >Priority: Major > Labels: newbie > > When using the Materialized object if the user wants to name the statestore > with > {code:java} > Materialized.as("MyStoreName"){code} > then subsequently provide the key and value serde the calls to do so must > take the form of > {code:java} > Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde) > {code} > If users do the following > {code:java} > Materialized.as("MyStoreName").with(keySerde, valueSerde) > {code} > the Materialized instance created by the "as(storeName)" call is replaced by > a new Materialized instance resulting from the "with(...)" call and any > configuration on the first Materialized instance is lost. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8663) partition assignment would be better original_assignment + new_reassignment during reassignments
[ https://issues.apache.org/jira/browse/KAFKA-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] GEORGE LI resolved KAFKA-8663. -- Resolution: Won't Fix looks like RAR + OAR is required for KIP-455 to preserve the targetReplicas exact ordering, and old replicas that need to be dropped is in the new removingReplicas. of /brokers/topics/ zk node. > partition assignment would be better original_assignment + new_reassignment > during reassignments > > > Key: KAFKA-8663 > URL: https://issues.apache.org/jira/browse/KAFKA-8663 > Project: Kafka > Issue Type: Improvement > Components: controller, core >Affects Versions: 1.1.1, 2.3.0 >Reporter: GEORGE LI >Priority: Minor > > From my observation/experience during reassignment, the partition assignment > replica ordering gets changed. because it's OAR + RAR (original replicas > + reassignment replicas) set union. > However, it seems like the preferred leaders changed during the > reassignments. Normally if there is no cluster preferred leader election, > the leader is still the old leader. But if during the reassignments, there > is a leader election, the leadership changes. This caused some side > effects. Let's look at this example. > {code} > Topic:georgeli_test PartitionCount:8ReplicationFactor:3 Configs: > Topic: georgeli_testPartition: 0Leader: 1026Replicas: > 1026,1028,1025Isr: 1026,1028,1025 > {code} > reassignment (1026,1028,1025) => (1027,1025,1028) > {code} > Topic:georgeli_test PartitionCount:8ReplicationFactor:4 > Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027 > Topic: georgeli_testPartition: 0Leader: 1026Replicas: > 1027,1025,1028,1026 Isr: 1026,1028,1025 > {code} > Notice the above: Leader remains 1026. but Replicas: 1027,1025,1028,1026. > If we run preferred leader election, it will try 1027 first, then 1025. > After 1027 is in ISR, then the final assignment will be (1027,1025,1028). > > My proposal for a minor improvement is to keep the original ordering replicas > during the reassignment (could be long for big topic/partitions). and after > all replicas in ISR, then finally set the partition assignment to New > reassignment. > {code} > val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ > controllerContext.partitionReplicaAssignment(topicPartition)).toSet > //1. Update AR in ZK with OAR + RAR. > updateAssignedReplicasForPartition(topicPartition, > newAndOldReplicas.toSeq) > {code} > above code changed to below to keep the original ordering first during > reassignment: > {code} > val newAndOldReplicas = > (controllerContext.partitionReplicaAssignment(topicPartition) ++ > reassignedPartitionContext.newReplicas).toSet > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8693) Flakey test ErrorHandlingIntegrationTest #testSkipRetryAndDLQWithHeaders
Boyang Chen created KAFKA-8693: -- Summary: Flakey test ErrorHandlingIntegrationTest #testSkipRetryAndDLQWithHeaders Key: KAFKA-8693 URL: https://issues.apache.org/jira/browse/KAFKA-8693 Project: Kafka Issue Type: Bug Reporter: Boyang Chen [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/438/console] ERROR: Failed to write output for test null.Gradle Test Executor 25*12:00:41* java.lang.NullPointerException: Cannot invoke method write() on null object*12:00:41* at org.codehaus.groovy.runtime.NullObject.invokeMethod(NullObject.java:91)*12:00:41* at org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.call(PogoMetaClassSite.java:47)*12:00:41* at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*12:00:41* at org.codehaus.groovy.runtime.callsite.NullCallSite.call(NullCallSite.java:34)*12:00:41* at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*12:00:41* at java_io_FileOutputStream$write.call(Unknown Source)*12:00:41* at build_ctl3wpaytn6q6nw5ya090go8u$_run_closure5$_closure75$_closure108.doCall(/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/build.gradle:244)*12:00:41* at jdk.internal.reflect.GeneratedMethodAccessor280.invoke(Unknown Source)*12:00:41* at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*12:00:41* at java.base/java.lang.reflect.Method.invoke(Method.java:566)*12:00:41* at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:104)*12:00:41* at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:326)*12:00:41* at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:264)*12:00:41* at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1041)*12:00:41*at groovy.lang.Closure.call(Closure.java:411)*12:00:41* at org.gradle.listener.ClosureBackedMethodInvocationDispatch.dispatch(ClosureBackedMethodInvocationDispatch.java:40)*12:00:41* at org.gradle.listener.ClosureBackedMethodInvocationDispatch.dispatch(ClosureBackedMethodInvocationDispatch.java:25)*12:00:41* at org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:42)*12:00:41* at org.gradle.internal.event.BroadcastDispatch$SingletonDispatch.dispatch(BroadcastDispatch.java:230)*12:00:41* at org.gradle.internal.event.BroadcastDispatch$SingletonDispatch.dispatch(BroadcastDispatch.java:149)*12:00:41* at org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:58)*12:00:41* at org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:324)*12:00:41* at org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:234)*12:00:41* at org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerBroadcast.java:140)*12:00:41* at org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerBroadcast.java:37)*12:00:41* at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)*12:00:41* at com.sun.proxy.$Proxy73.onOutput(Unknown Source)*12:00:41*at org.gradle.api.internal.tasks.testing.results.TestListenerAdapter.output(TestListenerAdapter.java:56)*12:00:41* at jdk.internal.reflect.GeneratedMethodAccessor260.invoke(Unknown Source)*12:00:41* at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*12:00:41* at java.base/java.lang.reflect.Method.invoke(Method.java:566)*12:00:41* at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)*12:00:41* at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)*12:00:41* at org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:42)*12:00:41* at org.gradle.internal.event.BroadcastDispatch$SingletonDispatch.dispatch(BroadcastDispatch.java:230)*12:00:41* at org.gradle.internal.event.BroadcastDispatch$SingletonDispatch.dispatch(BroadcastDispatch.java:149)*12:00:41* at org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:58)*12:00:41* at org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:324)*12:00:41* at org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:234)*12:00:41* at org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerBroadcast.java:140)*12:00:41* at org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerBroadcast.java:37)*12:00:41* at
[jira] [Created] (KAFKA-8692) Transient failure in kafka.api.SaslScramSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
Bill Bejeck created KAFKA-8692: -- Summary: Transient failure in kafka.api.SaslScramSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl Key: KAFKA-8692 URL: https://issues.apache.org/jira/browse/KAFKA-8692 Project: Kafka Issue Type: Bug Components: core, unit tests Reporter: Bill Bejeck Failed in build [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/420/] {noformat} Error Message org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout instead of the expected 1 records Stacktrace org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout instead of the expected 1 records at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822) at kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:781) at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1309) at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1317) at kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:522) at kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:361) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.ParentRunner.run(ParentRunner.java:412) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at jdk.internal.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at
[jira] [Updated] (KAFKA-8691) Flakey test ProcessorContextTest#shouldNotAllowToScheduleZeroMillisecondPunctuation
[ https://issues.apache.org/jira/browse/KAFKA-8691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-8691: --- Component/s: streams > Flakey test > ProcessorContextTest#shouldNotAllowToScheduleZeroMillisecondPunctuation > > > Key: KAFKA-8691 > URL: https://issues.apache.org/jira/browse/KAFKA-8691 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6384/consoleFull] > org.apache.kafka.streams.processor.internals.ProcessorContextTest > > shouldNotAllowToScheduleZeroMillisecondPunctuation PASSED*23:37:09* ERROR: > Failed to write output for test null.Gradle Test Executor 5*23:37:09* > java.lang.NullPointerException: Cannot invoke method write() on null > object*23:37:09*at > org.codehaus.groovy.runtime.NullObject.invokeMethod(NullObject.java:91)*23:37:09* > at > org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.call(PogoMetaClassSite.java:47)*23:37:09* > at > org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*23:37:09* > at > org.codehaus.groovy.runtime.callsite.NullCallSite.call(NullCallSite.java:34)*23:37:09* >at > org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*23:37:09* > at java_io_FileOutputStream$write.call(Unknown Source)*23:37:09* > at > build_5nv3fyjgqff9aim9wbxfnad9z$_run_closure5$_closure75$_closure108.doCall(/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/build.gradle:244) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8666) Improve Documentation on usage of Materialized config object
[ https://issues.apache.org/jira/browse/KAFKA-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889138#comment-16889138 ] Alexander Zafirov commented on KAFKA-8666: -- Looking at the JavaDoc I think that it is elaborate enough {code:java} // Materialize a {@link StateStore} with the provided key and value {@link Serde}s. * An internal name will be used for the store. {code} Let's hope that people are using modern IDEs that discourage calling the static methods on `Materialized` objects. >From my standpoint I think this issue should be closed. Hope our discussion is >useful for the people out there. Thank you, Matthias. > Improve Documentation on usage of Materialized config object > > > Key: KAFKA-8666 > URL: https://issues.apache.org/jira/browse/KAFKA-8666 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Bill Bejeck >Priority: Major > Labels: newbie > > When using the Materialized object if the user wants to name the statestore > with > {code:java} > Materialized.as("MyStoreName"){code} > then subsequently provide the key and value serde the calls to do so must > take the form of > {code:java} > Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde) > {code} > If users do the following > {code:java} > Materialized.as("MyStoreName").with(keySerde, valueSerde) > {code} > the Materialized instance created by the "as(storeName)" call is replaced by > a new Materialized instance resulting from the "with(...)" call and any > configuration on the first Materialized instance is lost. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889109#comment-16889109 ] Boyang Chen commented on KAFKA-8677: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6392/console] > Flakey test > GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > > > Key: KAFKA-8677 > URL: https://issues.apache.org/jira/browse/KAFKA-8677 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console] > > *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* > kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00* > *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 1 records -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8513) Add kafka-streams-application-reset.bat for Windows platform
[ https://issues.apache.org/jira/browse/KAFKA-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-8513. Resolution: Fixed Fix Version/s: 2.4.0 > Add kafka-streams-application-reset.bat for Windows platform > > > Key: KAFKA-8513 > URL: https://issues.apache.org/jira/browse/KAFKA-8513 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Minor > Fix For: 2.4.0 > > > For improving Windows support, it'd be nice if there were a batch file > corresponding to bin/kafka-streams-application-reset.sh. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8666) Improve Documentation on usage of Materialized config object
[ https://issues.apache.org/jira/browse/KAFKA-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889058#comment-16889058 ] Matthias J. Sax commented on KAFKA-8666: {quote}we are throwing away information that the user already provided if they choose to build their object {quote} Not sure if I agree with this statement. Assume there would not be any static methods, but a public constructor for `Materialized`: {code:java} // current API builder.table(..., Materialized.as("storeName").withKeySerde(keySerce).withValueSerde(valueSerde)); // or builder.table(..., Materialized.with(keySerde, valueSerde)); // if we would not have static methods but public constructors it would be like this: builder.table(..., new Materialized("storeName").withKeySerde(keySerce).withValueSerde(valueSerde)); // or builder.table(..., new Materialized(keySerde, valueSerde));{code} If you call {code:java} builder.table(..., Materialized.as("storeName").with(keySerde, valueSerde));{code} It's similar to call "new" twice and use the second object: {code:java} Materialized m; m = new Materialized("storeName"); m = new Materialized(keySerde, valueSerde) builder.table(..., m);{code} Hence, KS does not "through away" information IMHO. For the "new" case it's obvious that the second object will not have a store name set – it's two objects after all. It might be somewhat subtle, but calling a static method in a builder like pattern is like calling "new" (and an modern IDE should actually give you a warning about it – a static method should never be called on an object, but only on the class). {quote}Before going into solution mode I'd like to agree on something - we _do_ want the described functionality to be available, right? As you said we are using the `builder pattern` for that reason. Because in the commit message of the code says _Add a `with(Serde keySerde, Serde valSerde)` to `Materialized` for cases where people don't care about the state store name._ {quote} Yes. There are cased for which only passing in the Serdes is the best way and passing in a store name (that forces store materialization and disables store optimization) is undesired. {quote}That leads me to believe that maybe that the code is working as it is supposed and we should focus and highlighting that to the developer rather than finding a way to comply with the request in this issue. {quote} Agreed. {quote}If we do want continue `building` the the `Materialized` object please enlighten me what is the standard practise with Kafka APIs with regards to backwards compatibility? My intuition was that we shouldn't break code that relies on the current implementation - which will happen if we attempt to provide a fix for the way the current `with(keySerde, valueSerde)` is done. If that is not the case then a way to fix the problem is to remove the `static` identifier of the method. That will allows us to access previously provided `storeName`. {quote} Yes, code must be backward compatible. We could add a non-static `withStoreName(String)` method, that allows you to do: {code:java} Materialized.with(keySerde, valueSerde).withStoreName("storeName");{code} But there woulds still be the problem that one could still do it wrong: {code:java} Materialized.with(keySerde, valueSerde).as("storeName");{code} Therefore, I don't see a big advantage to add `withStoreName(String)` method. We can update the JavaDocs of course, but if people write "correct" JavaCode and don't call static methods on objects, they do it correctly automatically. > Improve Documentation on usage of Materialized config object > > > Key: KAFKA-8666 > URL: https://issues.apache.org/jira/browse/KAFKA-8666 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Bill Bejeck >Priority: Major > Labels: newbie > > When using the Materialized object if the user wants to name the statestore > with > {code:java} > Materialized.as("MyStoreName"){code} > then subsequently provide the key and value serde the calls to do so must > take the form of > {code:java} > Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde) > {code} > If users do the following > {code:java} > Materialized.as("MyStoreName").with(keySerde, valueSerde) > {code} > the Materialized instance created by the "as(storeName)" call is replaced by > a new Materialized instance resulting from the "with(...)" call and any > configuration on the first Materialized instance is lost. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7849) Warning when adding GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889046#comment-16889046 ] Matthias J. Sax commented on KAFKA-7849: That sounds correct. > Warning when adding GlobalKTable > > > Key: KAFKA-7849 > URL: https://issues.apache.org/jira/browse/KAFKA-7849 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Dmitry Minkovsky >Assignee: Omar Al-Safi >Priority: Minor > Labels: newbie > > Per > https://lists.apache.org/thread.html/59c119be8a2723c501e0653fa3ed571e8c09be40d5b5170c151528b5@%3Cusers.kafka.apache.org%3E > > When I add a GlobalKTable for topic "message-write-service-user-ids-by-email" > to my topology, I get this warning: > > [2019-01-19 12:18:14,008] WARN > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:421) > [Consumer > clientId=message-write-service-55f2ca4d-0efc-4344-90d3-955f9f5a65fd-StreamThread-2-consumer, > groupId=message-write-service] The following subscribed topics are not > assigned to any members: [message-write-service-user-ids-by-email] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8687) Pass store name when creating a Materialized using with(keySerde, valueSerde)
[ https://issues.apache.org/jira/browse/KAFKA-8687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889045#comment-16889045 ] Matthias J. Sax commented on KAFKA-8687: Why is `Materialized.as("myStoreName").withKeySerde(...).withValueSerde(...)` not sufficient? > Pass store name when creating a Materialized using with(keySerde, valueSerde) > - > > Key: KAFKA-8687 > URL: https://issues.apache.org/jira/browse/KAFKA-8687 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.3.0 >Reporter: jmhostalet >Priority: Minor > > current implementation of Materialized does not permit setting the name when > using with(keySerde, valueSerde) > {code:java} > public static Materialized > with(Serde keySerde, Serde valueSerde) { > return (new > Materialized((String)null)).withKeySerde(keySerde).withValueSerde(valueSerde); > } > {code} > it would be nice to have such a feature, for example: > {code:java} > public static Materialized > with(Serde keySerde, Serde valueSerde) { > return with((String)null, keySerde, valueSerde); > } > public static Materialized with(String > name, Serde keySerde, Serde valueSerde) { > return (new > Materialized(name)).withKeySerde(keySerde).withValueSerde(valueSerde); > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8690) Flakey test ConnectWorkerIntegrationTest#testAddAndRemoveWorke
Boyang Chen created KAFKA-8690: -- Summary: Flakey test ConnectWorkerIntegrationTest#testAddAndRemoveWorke Key: KAFKA-8690 URL: https://issues.apache.org/jira/browse/KAFKA-8690 Project: Kafka Issue Type: Bug Reporter: Boyang Chen [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23570/consoleFull] org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest > testAddAndRemoveWorker STARTED*02:56:46* org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker failed, log available in /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker.test.stdout*02:56:46* *02:56:46* org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest > testAddAndRemoveWorker FAILED*02:56:46* java.lang.AssertionError: Condition not met within timeout 15000. Connector tasks did not start in time.*02:56:46* at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*02:56:46* at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*02:56:46* at org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker(ConnectWorkerIntegrationTest.java:118) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7937) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
[ https://issues.apache.org/jira/browse/KAFKA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888978#comment-16888978 ] Boyang Chen commented on KAFKA-7937: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23570/consoleFull] > Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup > > > Key: KAFKA-7937 > URL: https://issues.apache.org/jira/browse/KAFKA-7937 > Project: Kafka > Issue Type: Bug > Components: admin, clients, unit tests >Affects Versions: 2.2.0, 2.1.1, 2.3.0 >Reporter: Matthias J. Sax >Assignee: Gwen Shapira >Priority: Critical > Fix For: 2.4.0 > > Attachments: log-job6122.txt > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/19/pipeline > {quote}kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsNotExistingGroup FAILED > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306) > at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89) > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available.{quote} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8689) Cannot Name Join State Store Topics
[ https://issues.apache.org/jira/browse/KAFKA-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-8689. Resolution: Duplicate Duplicate of https://issues.apache.org/jira/browse/KAFKA-8558 > Cannot Name Join State Store Topics > --- > > Key: KAFKA-8689 > URL: https://issues.apache.org/jira/browse/KAFKA-8689 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Simon Dean >Priority: Major > > Performing a join on two KStreams, produces two state store topics. > Currently the names state store topics are auto generated and cannot be > overridden. > Example code: > > {code:java} > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.serialization.LongSerializer; > import org.apache.kafka.common.serialization.Serde; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.kafka.streams.KafkaStreams; > import org.apache.kafka.streams.StreamsBuilder; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.Consumed; > import org.apache.kafka.streams.kstream.JoinWindows; > import org.apache.kafka.streams.kstream.Joined; > import org.apache.kafka.streams.kstream.KStream; > import java.time.Duration; > import java.util.HashMap; > import java.util.Map; > import java.util.Properties; > import java.util.concurrent.TimeUnit; > public class JoinTopicNamesExample { > public static void main(final String[] args) throws InterruptedException { > new Thread(() -> { > produce(args); > }).run(); > new Thread(() -> { > try { > streams(args); > } catch (InterruptedException e) { > e.printStackTrace(); > } > }).run(); > } > private static void produce(String[] args) { > Map props = new HashMap<>(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(ProducerConfig.RETRIES_CONFIG, 0); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > LongSerializer.class); > KafkaProducer producer = new KafkaProducer<>(props); > for (long i = 0; i < 10; i++) { > producer.send(new ProducerRecord("left", Long.toString(i), i)); > } > for (long i = 0; i < 10; i++) { > producer.send(new ProducerRecord("right", Long.toString(i), i)); > } > } > private static void streams(String[] args) throws InterruptedException { > final String bootstrapServers = args.length > 0 ? args[0] : > "localhost:9092"; > final Properties streamsConfiguration = new Properties(); > // Give the Streams application a unique name. The name must be > unique in the Kafka cluster > // against which the application is run. > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > "join-topic-names-example"); > streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, > "user-region-lambda-example-client"); > // Where to find Kafka broker(s). > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > bootstrapServers); > // Specify default (de)serializers for record keys and for record > values. > > streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.Long().getClass().getName()); > // Records should be flushed every 10 seconds. This is less than the > default > // in order to keep this example interactive. > streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 > * 1000); > final Serde stringSerde = Serdes.String(); > final Serde longSerde = Serdes.Long(); > final StreamsBuilder builder = new StreamsBuilder(); > final KStream left = builder.stream("left", > Consumed.with(stringSerde, longSerde)); > final KStream right = builder.stream("right", > Consumed.with(stringSerde, longSerde)); > left.join( > right, > (value1, value2) -> value1 + value2, > JoinWindows.of(Duration.ofHours(1)), > Joined.as("sum")); >
[jira] [Commented] (KAFKA-8689) Cannot Name Join State Store Topics
[ https://issues.apache.org/jira/browse/KAFKA-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888974#comment-16888974 ] Bill Bejeck commented on KAFKA-8689: Hi [~msmsimondean] Thanks for filing a Jira on this item and your timing is impeccable. We currently have a KIP to remedy this situation - [https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join]. Here's the corresponding discussion thread so far [https://www.mail-archive.com/dev@kafka.apache.org/msg98777.html]. Feel free to add any additional comments or questions you may have on the discussion thread. Here's the corresponding Jira https://issues.apache.org/jira/browse/KAFKA-8558. I'm going to close this ticket as a duplicate of KAFKA-8558, and you'll be able to track progress on this issue via that ticket and the corresponding PR (coming soon). Thanks again for your interest. > Cannot Name Join State Store Topics > --- > > Key: KAFKA-8689 > URL: https://issues.apache.org/jira/browse/KAFKA-8689 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Simon Dean >Priority: Major > > Performing a join on two KStreams, produces two state store topics. > Currently the names state store topics are auto generated and cannot be > overridden. > Example code: > > {code:java} > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.serialization.LongSerializer; > import org.apache.kafka.common.serialization.Serde; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.kafka.streams.KafkaStreams; > import org.apache.kafka.streams.StreamsBuilder; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.Consumed; > import org.apache.kafka.streams.kstream.JoinWindows; > import org.apache.kafka.streams.kstream.Joined; > import org.apache.kafka.streams.kstream.KStream; > import java.time.Duration; > import java.util.HashMap; > import java.util.Map; > import java.util.Properties; > import java.util.concurrent.TimeUnit; > public class JoinTopicNamesExample { > public static void main(final String[] args) throws InterruptedException { > new Thread(() -> { > produce(args); > }).run(); > new Thread(() -> { > try { > streams(args); > } catch (InterruptedException e) { > e.printStackTrace(); > } > }).run(); > } > private static void produce(String[] args) { > Map props = new HashMap<>(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(ProducerConfig.RETRIES_CONFIG, 0); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); > props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > LongSerializer.class); > KafkaProducer producer = new KafkaProducer<>(props); > for (long i = 0; i < 10; i++) { > producer.send(new ProducerRecord("left", Long.toString(i), i)); > } > for (long i = 0; i < 10; i++) { > producer.send(new ProducerRecord("right", Long.toString(i), i)); > } > } > private static void streams(String[] args) throws InterruptedException { > final String bootstrapServers = args.length > 0 ? args[0] : > "localhost:9092"; > final Properties streamsConfiguration = new Properties(); > // Give the Streams application a unique name. The name must be > unique in the Kafka cluster > // against which the application is run. > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > "join-topic-names-example"); > streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, > "user-region-lambda-example-client"); > // Where to find Kafka broker(s). > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > bootstrapServers); > // Specify default (de)serializers for record keys and for record > values. > > streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.Long().getClass().getName()); > // Records should be flushed every 10 seconds. This is less than the > default > // in order
[jira] [Updated] (KAFKA-8558) KIP-479 - Add Materialized Overload to KStream#Join
[ https://issues.apache.org/jira/browse/KAFKA-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-8558: --- Description: To prevent a topology incompatibility with the release of 2.4 and the naming of Join operations we'll add an overloaded KStream#join method accepting a Materialized parameter. This will allow users to explicitly name state stores created by Kafka Streams in the join operation. The overloads will apply to all flavors of KStream#join (inner, left, and right). was: To prevent a topology incompatibility with the release of 2.4 and the naming of Join operations we'll add an overloaded KStream#join method accepting a Materialized parameter. The overloads will apply to all flavors of KStream#join (inner, left, and right). Additionally, new methods withQueryingEnabled and withQueryingDisabled are going to be added to Materialized > KIP-479 - Add Materialized Overload to KStream#Join > > > Key: KAFKA-8558 > URL: https://issues.apache.org/jira/browse/KAFKA-8558 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Labels: needs-kip > Fix For: 2.4.0 > > > To prevent a topology incompatibility with the release of 2.4 and the naming > of Join operations we'll add an overloaded KStream#join method accepting a > Materialized parameter. This will allow users to explicitly name state stores > created by Kafka Streams in the join operation. > > The overloads will apply to all flavors of KStream#join (inner, left, and > right). -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-4790) Kafka cannot recover after a disk full
[ https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1628#comment-1628 ] bugnenno edited comment on KAFKA-4790 at 7/19/19 2:35 PM: -- This happens also in 2.1.1. I removed property "log.index.size.max.bytes" from my custom settings and error not appear again was (Author: bugnenno): This happens also in 2.1.1 > Kafka cannot recover after a disk full > -- > > Key: KAFKA-4790 > URL: https://issues.apache.org/jira/browse/KAFKA-4790 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1, 0.10.1.1, 2.1.0 >Reporter: Pengwei >Priority: Major > Labels: reliability > Fix For: 0.11.0.0 > > > [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager) > [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. > (kafka.log.Log) > [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads > during logs loading: java.lang.IllegalArgumentException: requirement failed: > Attempt to append to a full index (size = 128000). (kafka.log.LogManager) > [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. > Prepare to shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed: Attempt to append to > a full index (size = 128000). > at scala.Predef$.require(Predef.scala:219) > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:199) > at kafka.log.LogSegment.recover(LogSegment.scala:191) > at kafka.log.Log.recoverLog(Log.scala:259) > at kafka.log.Log.loadSegments(Log.scala:234) > at kafka.log.Log.(Log.scala:92) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8689) Cannot Name Join State Store Topics
Simon Dean created KAFKA-8689: - Summary: Cannot Name Join State Store Topics Key: KAFKA-8689 URL: https://issues.apache.org/jira/browse/KAFKA-8689 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.3.0 Reporter: Simon Dean Performing a join on two KStreams, produces two state store topics. Currently the names state store topics are auto generated and cannot be overridden. Example code: {code:java} import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; public class JoinTopicNamesExample { public static void main(final String[] args) throws InterruptedException { new Thread(() -> { produce(args); }).run(); new Thread(() -> { try { streams(args); } catch (InterruptedException e) { e.printStackTrace(); } }).run(); } private static void produce(String[] args) { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); KafkaProducer producer = new KafkaProducer<>(props); for (long i = 0; i < 10; i++) { producer.send(new ProducerRecord("left", Long.toString(i), i)); } for (long i = 0; i < 10; i++) { producer.send(new ProducerRecord("right", Long.toString(i), i)); } } private static void streams(String[] args) throws InterruptedException { final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092"; final Properties streamsConfiguration = new Properties(); // Give the Streams application a unique name. The name must be unique in the Kafka cluster // against which the application is run. streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-topic-names-example"); streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "user-region-lambda-example-client"); // Where to find Kafka broker(s). streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Specify default (de)serializers for record keys and for record values. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); // Records should be flushed every 10 seconds. This is less than the default // in order to keep this example interactive. streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); final Serde stringSerde = Serdes.String(); final Serde longSerde = Serdes.Long(); final StreamsBuilder builder = new StreamsBuilder(); final KStream left = builder.stream("left", Consumed.with(stringSerde, longSerde)); final KStream right = builder.stream("right", Consumed.with(stringSerde, longSerde)); left.join( right, (value1, value2) -> value1 + value2, JoinWindows.of(Duration.ofHours(1)), Joined.as("sum")); final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); streams.start(); Thread.sleep(TimeUnit.MINUTES.toMillis(1)); // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } } {code} Here are the topics produce by the example code: *
[jira] [Created] (KAFKA-8688) Upgrade system tests fail due to data loss with older message format
Rajini Sivaram created KAFKA-8688: - Summary: Upgrade system tests fail due to data loss with older message format Key: KAFKA-8688 URL: https://issues.apache.org/jira/browse/KAFKA-8688 Project: Kafka Issue Type: Bug Components: system tests Reporter: Rajini Sivaram Assignee: Rajini Sivaram System test failure for TestUpgrade/test_upgrade: from_kafka_version=0.9.0.1, to_message_format_version=0.9.0.1, compression_types=.lz4 {code:java} 3 acked message did not make it to the Consumer. They are: [33906, 33900, 33903]. The first 3 missing messages were validated to ensure they are in Kafka's data files. 3 were missing. This suggests data loss. Here are some of the messages not found in the data files: [33906, 33900, 33903] Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", line 132, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py", line 189, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/mark/_mark.py", line 428, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/core/upgrade_test.py", line 136, in test_upgrade self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version, File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/produce_consume_validate.py", line 112, in run_produce_consume_validate self.validate() File "/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/produce_consume_validate.py", line 135, in validate assert succeeded, error_msg AssertionError: 3 acked message did not make it to the Consumer. They are: [33906, 33900, 33903]. The first 3 missing messages were validated to ensure they are in Kafka's data files. 3 were missing. This suggests data loss. Here are some of the messages not found in the data files: [33906, 33900, 33903] {code} Logs show: # Broker 1 is leader of partition # Broker 2 successfully fetches from offset 10947 and processes request # Broker 2 sends fetch request to broker 1 for offset 10950 # Broker 1 sets is HW to 10950, acknowledges produce requests up to HW # Broker 2 is elected leader # Broker 2 truncates to its local HW of 10947 - 3 messages are lost This data loss is a known issue that was fixed under KIP-101. But since this can still happen with older messages formats, we should update upgrade tests to cope with some data loss. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-4790) Kafka cannot recover after a disk full
[ https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1628#comment-1628 ] bugnenno commented on KAFKA-4790: - This happens also in 2.1.1 > Kafka cannot recover after a disk full > -- > > Key: KAFKA-4790 > URL: https://issues.apache.org/jira/browse/KAFKA-4790 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1, 0.10.1.1, 2.1.0 >Reporter: Pengwei >Priority: Major > Labels: reliability > Fix For: 0.11.0.0 > > > [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager) > [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. > (kafka.log.Log) > [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads > during logs loading: java.lang.IllegalArgumentException: requirement failed: > Attempt to append to a full index (size = 128000). (kafka.log.LogManager) > [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. > Prepare to shutdown (kafka.server.KafkaServer) > java.lang.IllegalArgumentException: requirement failed: Attempt to append to > a full index (size = 128000). > at scala.Predef$.require(Predef.scala:219) > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:199) > at kafka.log.LogSegment.recover(LogSegment.scala:191) > at kafka.log.Log.recoverLog(Log.scala:259) > at kafka.log.Log.loadSegments(Log.scala:234) > at kafka.log.Log.(Log.scala:92) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8687) Pass store name when creating a Materialized using with(keySerde, valueSerde)
jmhostalet created KAFKA-8687: - Summary: Pass store name when creating a Materialized using with(keySerde, valueSerde) Key: KAFKA-8687 URL: https://issues.apache.org/jira/browse/KAFKA-8687 Project: Kafka Issue Type: Wish Components: streams Affects Versions: 2.3.0 Reporter: jmhostalet current implementation of Materialized does not permit setting the name when using with(keySerde, valueSerde) {code:java} public static Materialized with(Serde keySerde, Serde valueSerde) { return (new Materialized((String)null)).withKeySerde(keySerde).withValueSerde(valueSerde); } {code} it would be nice to have such a feature, for example: {code:java} public static Materialized with(Serde keySerde, Serde valueSerde) { return with((String)null, keySerde, valueSerde); } public static Materialized with(String name, Serde keySerde, Serde valueSerde) { return (new Materialized(name)).withKeySerde(keySerde).withValueSerde(valueSerde); } {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8646) Materialized.withLoggingDisabled() does not disable changelog topics creation
[ https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jmhostalet resolved KAFKA-8646. --- Resolution: Not A Bug > Materialized.withLoggingDisabled() does not disable changelog topics creation > - > > Key: KAFKA-8646 > URL: https://issues.apache.org/jira/browse/KAFKA-8646 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: jmhostalet >Assignee: Bill Bejeck >Priority: Minor > > I have a cluster with 3 brokers running version 0.11 > My kafka-streams app was using kafka-client 0.11.0.1 but recently I've > migrated to 2.3.0 > I have no executed any migration as my data is disposable, therefore I have > deleted all intermediate topics, except input and output topics. > My streams config is: > {code:java} > application.id = consumer-id-v1.00 > application.server = > bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092] > buffered.records.per.partition = 1000 > cache.max.bytes.buffering = 524288000 > client.id = > commit.interval.ms = 3 > connections.max.idle.ms = 54 > default.deserialization.exception.handler = class > org.apache.kafka.streams.errors.LogAndFailExceptionHandler > default.key.serde = class > org.apache.kafka.common.serialization.Serdes$StringSerde > default.production.exception.handler = class > org.apache.kafka.streams.errors.DefaultProductionExceptionHandler > default.timestamp.extractor = class com.acme.stream.TimeExtractor > default.value.serde = class com.acme.serde.MyDtoSerde > max.task.idle.ms = 0 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > num.standby.replicas = 0 > num.stream.threads = 25 > partition.grouper = class > org.apache.kafka.streams.processor.DefaultPartitionGrouper > poll.ms = 100 > processing.guarantee = at_least_once > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > replication.factor = 1 > request.timeout.ms = 4 > retries = 0 > retry.backoff.ms = 100 > rocksdb.config.setter = null > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > state.cleanup.delay.ms = 60 > state.dir = /tmp/kafka-streams > topology.optimization = none > upgrade.from = null > windowstore.changelog.additional.retention.ms = 8640 > {code} > in my stream I am using withLoggingDisabled > {code:java} > stream.filter((key, val) -> val!=null) > .selectKey((key, val) -> getId(val)) > .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new > MyDtoSerde())) > .windowedBy(TimeWindows.of(aggregationWindowSizeDuration) >.grace(windowRetentionPeriodDuration)) > .aggregate(MyDto::new, >new MyUpdater(), >Materialized.as("aggregation-updater") >.withLoggingDisabled() >.with(Serdes.String(), new MyDtoSerde())) > .toStream((k, v) -> k.key()) > .mapValues(val -> { ... > {code} > but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter > if I delete them before running again the app or if I change the > application.id > With a new application.id, topics are recreated with the new prefix. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8646) Materialized.withLoggingDisabled() does not disable changelog topics creation
[ https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1601#comment-1601 ] jmhostalet commented on KAFKA-8646: --- Hi Bill, after a huge effort to match the generic types, it has worked thanks to your advice. Thank you very much! > Materialized.withLoggingDisabled() does not disable changelog topics creation > - > > Key: KAFKA-8646 > URL: https://issues.apache.org/jira/browse/KAFKA-8646 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: jmhostalet >Assignee: Bill Bejeck >Priority: Minor > > I have a cluster with 3 brokers running version 0.11 > My kafka-streams app was using kafka-client 0.11.0.1 but recently I've > migrated to 2.3.0 > I have no executed any migration as my data is disposable, therefore I have > deleted all intermediate topics, except input and output topics. > My streams config is: > {code:java} > application.id = consumer-id-v1.00 > application.server = > bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092] > buffered.records.per.partition = 1000 > cache.max.bytes.buffering = 524288000 > client.id = > commit.interval.ms = 3 > connections.max.idle.ms = 54 > default.deserialization.exception.handler = class > org.apache.kafka.streams.errors.LogAndFailExceptionHandler > default.key.serde = class > org.apache.kafka.common.serialization.Serdes$StringSerde > default.production.exception.handler = class > org.apache.kafka.streams.errors.DefaultProductionExceptionHandler > default.timestamp.extractor = class com.acme.stream.TimeExtractor > default.value.serde = class com.acme.serde.MyDtoSerde > max.task.idle.ms = 0 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > num.standby.replicas = 0 > num.stream.threads = 25 > partition.grouper = class > org.apache.kafka.streams.processor.DefaultPartitionGrouper > poll.ms = 100 > processing.guarantee = at_least_once > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > replication.factor = 1 > request.timeout.ms = 4 > retries = 0 > retry.backoff.ms = 100 > rocksdb.config.setter = null > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > state.cleanup.delay.ms = 60 > state.dir = /tmp/kafka-streams > topology.optimization = none > upgrade.from = null > windowstore.changelog.additional.retention.ms = 8640 > {code} > in my stream I am using withLoggingDisabled > {code:java} > stream.filter((key, val) -> val!=null) > .selectKey((key, val) -> getId(val)) > .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new > MyDtoSerde())) > .windowedBy(TimeWindows.of(aggregationWindowSizeDuration) >.grace(windowRetentionPeriodDuration)) > .aggregate(MyDto::new, >new MyUpdater(), >Materialized.as("aggregation-updater") >.withLoggingDisabled() >.with(Serdes.String(), new MyDtoSerde())) > .toStream((k, v) -> k.key()) > .mapValues(val -> { ... > {code} > but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter > if I delete them before running again the app or if I change the > application.id > With a new application.id, topics are recreated with the new prefix. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-1111) Broker prematurely accepts TopicMetadataRequests on startup
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888789#comment-16888789 ] Sönke Liebau commented on KAFKA-: - I've not tested this, but there has been some work around TopicMetadataRequests creating topics. Most notably KAFKA-5291 which is at least related to this, but certainly doesn't fix it. Looking at the date on this though, is this still an issue or can we close this? > Broker prematurely accepts TopicMetadataRequests on startup > --- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Bug >Reporter: Jason Rosenberg >Assignee: Neha Narkhede >Priority: Major > > I have an issue where on startup, the broker starts accepting > TopicMetadataRequests before it has had metadata sync'd from the controller. > This results in a bunch of log entries that look like this: > 013-11-01 03:26:01,577 INFO [kafka-request-handler-0] admin.AdminUtils$ - > Topic creation { "partitions":{ "0":[ 9, 10 ] }, "version":1 } > 2013-11-01 03:26:07,767 INFO [kafka-request-handler-1] admin.AdminUtils$ - > Topic creation { "partitions":{ "0":[ 9, 11 ] }, "version":1 } > 2013-11-01 03:26:07,823 INFO [kafka-request-handler-1] admin.AdminUtils$ - > Topic creation { "partitions":{ "0":[ 10, 11 ] }, "version":1 } > 2013-11-01 03:26:11,183 INFO [kafka-request-handler-2] admin.AdminUtils$ - > Topic creation { "partitions":{ "0":[ 10, 11 ] }, "version":1 } > From an email thread, Neha remarks: > Before a broker receives the first > LeaderAndIsrRequest/UpdateMetadataRequest, it is technically not ready to > start serving any request. But it still ends up serving > TopicMetadataRequest which can re-create topics accidentally. It shouldn't > succeed, but this is still a problem. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-1099) StopReplicaRequest and StopReplicaResponse should also carry the replica ids
[ https://issues.apache.org/jira/browse/KAFKA-1099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888786#comment-16888786 ] Sönke Liebau commented on KAFKA-1099: - Pretty sure this information has not been included in the requests until now (at least I cannot find it in the code), but as this is now 6 years old, is this still relevant or has become obsolete by different changes in the meantime? > StopReplicaRequest and StopReplicaResponse should also carry the replica ids > > > Key: KAFKA-1099 > URL: https://issues.apache.org/jira/browse/KAFKA-1099 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.1 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Major > > The stop replica request and response only contain a list of partitions for > which a replica should be moved to offline/nonexistent state. But the replica > id information is implicit in the network layer as the receiving broker. This > complicates stop replica response handling on the controller. This blocks the > right fix for KAFKA-1097 since it requires invoking callback for processing a > StopReplicaResponse and it requires to know the replica id from the > StopReplicaResponse. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-1234) All kafka-run-class.sh to source in user config file (to set env vars like KAFKA_OPTS)
[ https://issues.apache.org/jira/browse/KAFKA-1234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888782#comment-16888782 ] Sönke Liebau commented on KAFKA-1234: - I'm unsure what the "gospel" way of doing this is, but my personal belief is that tools that run Kafka as a service should allow for management of environment variables. Most prevalent these days is probably systemd, which makes this very easy. I have limited experience with Upstart but do believe this is possible, if somewhat awkward. For init.d scripts I have not really found an acceptable solution apart from sourcing an env file from the init.d script, but as Kafka does not include these you would have to create them yourself anyway, so sourcing something should not create any conflicts with updates etc. Relying on a file in the Kafka base dir bears the risk of losing this file during an update, depending on how that update is performed, so I'd prefer relying on external methods. > All kafka-run-class.sh to source in user config file (to set env vars like > KAFKA_OPTS) > -- > > Key: KAFKA-1234 > URL: https://issues.apache.org/jira/browse/KAFKA-1234 > Project: Kafka > Issue Type: Improvement > Components: config >Affects Versions: 0.8.0 >Reporter: Alex Gray >Priority: Trivial > Attachments: patch.txt > > > We are using the distributed version of kafka: > http://apache.mirrors.lucidnetworks.net/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz > And we would like to set some environment variables, particularly KAFKA_OPTS > when the kafka service starts. > In other words, when someone does a "sudo service kafka start" we would like > to set some environment variables. > We cannot do this *without* modifying either /etc/init.d/kafka or > bin/kafka-run-class.sh, and we don't want to modify files that we don't own. > The solution is to have kafka source in a user specific file that may have > these environment variables set. > I'm attaching the patch file to show you what I mean. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7849) Warning when adding GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888772#comment-16888772 ] Omar Al-Safi commented on KAFKA-7849: - Thank you [~mjsax]! I want to validate my findings with you, as I understood from the code and {{DEBUG}} logs, {{GlobalStreamThread}} should be responsible for subscribing to all the global topics as I can see [here|https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L698], therefore {{StreamThread}} shouldn't subscribe to any of these topics as the {{GlobalStreamThread}} is the sole responsible for this. Based on this understanding, I was looking at {{sourceTopicPattern()}} in {{InternalTopologyBuilder}} since is responsible for returning the topics pattern that consumer in {{StreamThread}} should subscribe to, as I can see, this method doesn't pay attention to the global topics although we have this information in {{globalTopics}} HashSet, therefore I believe to fix this issue, we will just need to filter out all global topics from that method before returning the topic pattern, do you think that makes since? > Warning when adding GlobalKTable > > > Key: KAFKA-7849 > URL: https://issues.apache.org/jira/browse/KAFKA-7849 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Dmitry Minkovsky >Assignee: Omar Al-Safi >Priority: Minor > Labels: newbie > > Per > https://lists.apache.org/thread.html/59c119be8a2723c501e0653fa3ed571e8c09be40d5b5170c151528b5@%3Cusers.kafka.apache.org%3E > > When I add a GlobalKTable for topic "message-write-service-user-ids-by-email" > to my topology, I get this warning: > > [2019-01-19 12:18:14,008] WARN > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:421) > [Consumer > clientId=message-write-service-55f2ca4d-0efc-4344-90d3-955f9f5a65fd-StreamThread-2-consumer, > groupId=message-write-service] The following subscribed topics are not > assigned to any members: [message-write-service-user-ids-by-email] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-822) Reassignment of partitions needs a cleanup
[ https://issues.apache.org/jira/browse/KAFKA-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888771#comment-16888771 ] Sönke Liebau commented on KAFKA-822: While not 100% sure how this looks in today's code, but I do believe that the /admin/reassign_partitons znode gets deleted after a finished reassignment at least (see for example KAFKA-6193). I'd be in favor of closing this due to no activity for more than 6 years. > Reassignment of partitions needs a cleanup > -- > > Key: KAFKA-822 > URL: https://issues.apache.org/jira/browse/KAFKA-822 > Project: Kafka > Issue Type: Bug > Components: controller, tools >Affects Versions: 0.8.0 >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike >Priority: Major > Labels: bugs > > 1. This is probably a left-over from when the ReassignPartitionsCommand used > to be blocking: > Currently, for each partition that is reassigned, controller deletes the > /admin/reassign_partitions zk path, and populates it with a new list with the > reassigned partition removed from the original list. This is probably an > overkill, and we can delete the zk path completely once the reassignment of > all partitions has completed successfully or in error. > 2. It will help to clarify that there could be no replicas that have started > and are not in the ISR when KafkaController.onPartitionReassignment() is > called. > 3. We should batch the requests in > KafkaController.StopOldReplicasOfReassignedPartition() > 4. Update controllerContext.partitionReplicaAssignment only once in > KafkaController.updateAssignedReplicasForPartition(). > 5. Need to thoroughly test. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-1016) Broker should limit purgatory size
[ https://issues.apache.org/jira/browse/KAFKA-1016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888768#comment-16888768 ] Sönke Liebau commented on KAFKA-1016: - Is this still relevant after the Purgatory redesign in KAFKA-1430 and KAFKA-1989 ? It seems to me that the improvements made there would at least alleviate the issue described here due to vast performance improvements, even if no hard limit is introduced (which I'm not sure we want to do, as this basically would mean limiting the number of consumers that we are willing to serve). > Broker should limit purgatory size > -- > > Key: KAFKA-1016 > URL: https://issues.apache.org/jira/browse/KAFKA-1016 > Project: Kafka > Issue Type: Bug > Components: purgatory >Affects Versions: 0.8.0 >Reporter: Chris Riccomini >Assignee: Joel Koshy >Priority: Major > > I recently ran into a case where a poorly configured Kafka consumer was able > to trigger out of memory exceptions in multiple Kafka brokers. The consumer > was configured to have a fetcher.max.wait of Int.MaxInt. > For low volume topics, this configuration causes the consumer to block for > frequently, and for long periods of time. [~junrao] informs me that the fetch > request will time out after the socket timeout is reached. In our case, this > was set to 30s. > With several thousand consumer threads, the fetch request purgatory got into > the 100,000-400,000 range, which we believe triggered the out of memory > exception. [~nehanarkhede] claims to have seem similar behavior in other high > volume clusters. > It kind of seems like a bad thing that a poorly configured consumer can > trigger out of memory exceptions in the broker. I was thinking maybe it makes > sense to have the broker try and protect itself from this situation. Here are > some potential solutions: > 1. Have a broker-side max wait config for fetch requests. > 2. Threshold the purgatory size, and either drop the oldest connections in > purgatory, or reject the newest fetch requests when purgatory is full. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8666) Improve Documentation on usage of Materialized config object
[ https://issues.apache.org/jira/browse/KAFKA-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888684#comment-16888684 ] Alexander Zafirov commented on KAFKA-8666: -- Makes sense, Matthias. Yet currently as described in the issue we are facing a controversy - we are throwing away information that the user already provided if they choose to build their object. That should not be the case. Before going into solution mode I'd like to agree on something - we _do_ want the described functionality to be available, right? As you said we are using the `builder pattern` for that reason. Because in the commit message of the code says _Add a `with(Serde keySerde, Serde valSerde)` to `Materialized` for cases where people don't care about the state store name._ That leads me to believe that maybe that the code is working as it is supposed and we should focus and highlighting that to the developer rather than finding a way to comply with the request in this issue. If we do want continue `building` the the `Materialized` object please enlighten me what is the standard practise with Kafka APIs with regards to backwards compatibility? My intuition was that we shouldn't break code that relies on the current implementation - which will happen if we attempt to provide a fix for the way the current `with(keySerde, valueSerde)` is done. If that is not the case then a way to fix the problem is to remove the `static` identifier of the method. That will allows us to access previously provided `storeName`. > Improve Documentation on usage of Materialized config object > > > Key: KAFKA-8666 > URL: https://issues.apache.org/jira/browse/KAFKA-8666 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Bill Bejeck >Priority: Major > Labels: newbie > > When using the Materialized object if the user wants to name the statestore > with > {code:java} > Materialized.as("MyStoreName"){code} > then subsequently provide the key and value serde the calls to do so must > take the form of > {code:java} > Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde) > {code} > If users do the following > {code:java} > Materialized.as("MyStoreName").with(keySerde, valueSerde) > {code} > the Materialized instance created by the "as(storeName)" call is replaced by > a new Materialized instance resulting from the "with(...)" call and any > configuration on the first Materialized instance is lost. -- This message was sent by Atlassian JIRA (v7.6.14#76016)