[jira] [Updated] (KAFKA-16931) Transient REST failures to forward fenceZombie requests leave Connect Tasks in FAILED state
[ https://issues.apache.org/jira/browse/KAFKA-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16931: -- Summary: Transient REST failures to forward fenceZombie requests leave Connect Tasks in FAILED state (was: A transient REST failure to forward fenceZombie request leaves Connect Task in FAILED state) > Transient REST failures to forward fenceZombie requests leave Connect Tasks > in FAILED state > --- > > Key: KAFKA-16931 > URL: https://issues.apache.org/jira/browse/KAFKA-16931 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Edoardo Comar >Priority: Major > > When Kafka Connect runs in exactly_once mode, a task restart will fence > possible zombies tasks. > This is achieved forwarding the request to the leader worker using the REST > protocol. > At scale, in distributed mode, occasionally an HTTPs request may fail because > of a networking glitch, reconfiguration etc > Currently there is no attempt to retry the REST request, the task is left in > a FAILED state and requires an external restart (with the REST API). > Would this issue require a small KIP to introduce configuration entries to > limit the number of retries, backoff times etc ? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16931) A transient REST failure to forward fenceZombie request leaves Connect Task in FAILED state
Edoardo Comar created KAFKA-16931: - Summary: A transient REST failure to forward fenceZombie request leaves Connect Task in FAILED state Key: KAFKA-16931 URL: https://issues.apache.org/jira/browse/KAFKA-16931 Project: Kafka Issue Type: Bug Components: connect Reporter: Edoardo Comar When Kafka Connect runs in exactly_once mode, a task restart will fence possible zombies tasks. This is achieved forwarding the request to the leader worker using the REST protocol. At scale, in distributed mode, occasionally an HTTPs request may fail because of a networking glitch, reconfiguration etc Currently there is no attempt to retry the REST request, the task is left in a FAILED state and requires an external restart (with the REST API). Would this issue require a small KIP to introduce configuration entries to limit the number of retries, backoff times etc ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16570: -- Fix Version/s: 3.8.0 3.7.1 > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15072) Flaky test MirrorConnectorsIntegrationExactlyOnceTest.testReplicationWithEmptyPartition
[ https://issues.apache.org/jira/browse/KAFKA-15072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852021#comment-17852021 ] Edoardo Comar commented on KAFKA-15072: --- [~jlprat] [~cadonna] I think these failures could have been another instance of this bug https://issues.apache.org/jira/browse/KAFKA-16047 Feel free to close this issue if you agree it is no longer is a problem > Flaky test > MirrorConnectorsIntegrationExactlyOnceTest.testReplicationWithEmptyPartition > --- > > Key: KAFKA-15072 > URL: https://issues.apache.org/jira/browse/KAFKA-15072 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.5.0 >Reporter: Josep Prat >Priority: Major > Labels: flaky-test > > Test > MirrorConnectorsIntegrationExactlyOnceTest.testReplicationWithEmptyPartition > became flaky again, but it's a different error this time. > Occurrence: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testReplicationWithEmptyPartition__/] > > h3. Error Message > {code:java} > java.lang.AssertionError: Connector MirrorHeartbeatConnector tasks did not > start in time on cluster: backup-connect-cluster{code} > h3. Stacktrace > {code:java} > java.lang.AssertionError: Connector MirrorHeartbeatConnector tasks did not > start in time on cluster: backup-connect-cluster at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:301) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:912) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicationWithEmptyPartition(MirrorConnectorsIntegrationBaseTest.java:415) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at >
[jira] [Closed] (KAFKA-14657) Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced
[ https://issues.apache.org/jira/browse/KAFKA-14657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar closed KAFKA-14657. - > Admin.fenceProducers fails when Producer has ongoing transaction - but > Producer gets fenced > --- > > Key: KAFKA-14657 > URL: https://issues.apache.org/jira/browse/KAFKA-14657 > Project: Kafka > Issue Type: Bug > Components: admin >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: FenceProducerDuringTx.java, FenceProducerOutsideTx.java > > > Admin.fenceProducers() > fails with a ConcurrentTransactionsException if invoked when a Producer has a > transaction ongoing. > However, further attempts by that producer to produce fail with > InvalidProducerEpochException and the producer is not re-usable, > cannot abort/commit as it is fenced. > An InvalidProducerEpochException is also logged as error on the broker > [2023-01-27 17:16:32,220] ERROR [ReplicaManager broker=1] Error processing > append operation on partition topic-0 (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.InvalidProducerEpochException: Epoch of > producer 1062 at offset 84 in topic-0 is 0, which is smaller than the last > seen epoch > > Conversely, if Admin.fenceProducers() > is invoked while there is no open transaction, the call succeeds and further > attempts by that producer to produce fail with ProducerFenced. > see attached snippets > As the caller of Admin.fenceProducers() is likely unaware of the producers > state, the call should succeed regardless -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852018#comment-17852018 ] Edoardo Comar commented on KAFKA-16570: --- https://issues.apache.org/jira/browse/KAFKA-14657 > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Edoardo Comar >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14657) Admin.fenceProducers fails when Producer has ongoing transaction - but Producer gets fenced
[ https://issues.apache.org/jira/browse/KAFKA-14657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar resolved KAFKA-14657. --- Resolution: Duplicate duplicate of https://issues.apache.org/jira/browse/KAFKA-16570 > Admin.fenceProducers fails when Producer has ongoing transaction - but > Producer gets fenced > --- > > Key: KAFKA-14657 > URL: https://issues.apache.org/jira/browse/KAFKA-14657 > Project: Kafka > Issue Type: Bug > Components: admin >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: FenceProducerDuringTx.java, FenceProducerOutsideTx.java > > > Admin.fenceProducers() > fails with a ConcurrentTransactionsException if invoked when a Producer has a > transaction ongoing. > However, further attempts by that producer to produce fail with > InvalidProducerEpochException and the producer is not re-usable, > cannot abort/commit as it is fenced. > An InvalidProducerEpochException is also logged as error on the broker > [2023-01-27 17:16:32,220] ERROR [ReplicaManager broker=1] Error processing > append operation on partition topic-0 (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.InvalidProducerEpochException: Epoch of > producer 1062 at offset 84 in topic-0 is 0, which is smaller than the last > seen epoch > > Conversely, if Admin.fenceProducers() > is invoked while there is no open transaction, the call succeeds and further > attempts by that producer to produce fail with ProducerFenced. > see attached snippets > As the caller of Admin.fenceProducers() is likely unaware of the producers > state, the call should succeed regardless -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16488) fix flaky MirrorConnectorsIntegrationExactlyOnceTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-16488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852012#comment-17852012 ] Edoardo Comar commented on KAFKA-16488: --- with the above fix merged, I can rerun the test multiple time and it does not appear to be flaky to me. I'm closing as resolved [~chia7712] feel free to reopen if you do not think it's fixed by https://issues.apache.org/jira/browse/KAFKA-16047 > fix flaky MirrorConnectorsIntegrationExactlyOnceTest#testReplication > > > Key: KAFKA-16488 > URL: https://issues.apache.org/jira/browse/KAFKA-16488 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Priority: Major > > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > reset connector offsets. Error response: {"error_code":500,"message":"Failed > to perform zombie fencing for source connector prior to modifying offsets"} > at > app//org.apache.kafka.connect.util.clusters.EmbeddedConnect.resetConnectorOffsets(EmbeddedConnect.java:646) > at > app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.resetConnectorOffsets(EmbeddedConnectCluster.java:48) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.resetAllMirrorMakerConnectorOffsets(MirrorConnectorsIntegrationBaseTest.java:1063) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplication(MirrorConnectorsIntegrationExactlyOnceTest.java:90) > at > java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at >
[jira] [Resolved] (KAFKA-16488) fix flaky MirrorConnectorsIntegrationExactlyOnceTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-16488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar resolved KAFKA-16488. --- Resolution: Fixed > fix flaky MirrorConnectorsIntegrationExactlyOnceTest#testReplication > > > Key: KAFKA-16488 > URL: https://issues.apache.org/jira/browse/KAFKA-16488 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Priority: Major > > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > reset connector offsets. Error response: {"error_code":500,"message":"Failed > to perform zombie fencing for source connector prior to modifying offsets"} > at > app//org.apache.kafka.connect.util.clusters.EmbeddedConnect.resetConnectorOffsets(EmbeddedConnect.java:646) > at > app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.resetConnectorOffsets(EmbeddedConnectCluster.java:48) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.resetAllMirrorMakerConnectorOffsets(MirrorConnectorsIntegrationBaseTest.java:1063) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplication(MirrorConnectorsIntegrationExactlyOnceTest.java:90) > at > java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at >
[jira] [Commented] (KAFKA-16488) fix flaky MirrorConnectorsIntegrationExactlyOnceTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-16488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851575#comment-17851575 ] Edoardo Comar commented on KAFKA-16488: --- This failure may be due to https://issues.apache.org/jira/browse/KAFKA-16047 Although even with a fix to the above JIRA, i.e. https://github.com/apache/kafka/pull/16151 the herder REST does not appear to retry a failed invocation > fix flaky MirrorConnectorsIntegrationExactlyOnceTest#testReplication > > > Key: KAFKA-16488 > URL: https://issues.apache.org/jira/browse/KAFKA-16488 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Priority: Major > > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > reset connector offsets. Error response: {"error_code":500,"message":"Failed > to perform zombie fencing for source connector prior to modifying offsets"} > at > app//org.apache.kafka.connect.util.clusters.EmbeddedConnect.resetConnectorOffsets(EmbeddedConnect.java:646) > at > app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.resetConnectorOffsets(EmbeddedConnectCluster.java:48) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.resetAllMirrorMakerConnectorOffsets(MirrorConnectorsIntegrationBaseTest.java:1063) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplication(MirrorConnectorsIntegrationExactlyOnceTest.java:90) > at > java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at >
[jira] [Comment Edited] (KAFKA-16488) fix flaky MirrorConnectorsIntegrationExactlyOnceTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-16488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851575#comment-17851575 ] Edoardo Comar edited comment on KAFKA-16488 at 6/3/24 10:30 AM: This failure may be due to https://issues.apache.org/jira/browse/KAFKA-16047 Although even with a fix to the above JIRA, i.e. [https://github.com/apache/kafka/pull/16151] the REST client in the distributed herder does not appear to retry a failed invocation was (Author: ecomar): This failure may be due to https://issues.apache.org/jira/browse/KAFKA-16047 Although even with a fix to the above JIRA, i.e. https://github.com/apache/kafka/pull/16151 the herder REST does not appear to retry a failed invocation > fix flaky MirrorConnectorsIntegrationExactlyOnceTest#testReplication > > > Key: KAFKA-16488 > URL: https://issues.apache.org/jira/browse/KAFKA-16488 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Priority: Major > > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > reset connector offsets. Error response: {"error_code":500,"message":"Failed > to perform zombie fencing for source connector prior to modifying offsets"} > at > app//org.apache.kafka.connect.util.clusters.EmbeddedConnect.resetConnectorOffsets(EmbeddedConnect.java:646) > at > app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.resetConnectorOffsets(EmbeddedConnectCluster.java:48) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.resetAllMirrorMakerConnectorOffsets(MirrorConnectorsIntegrationBaseTest.java:1063) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplication(MirrorConnectorsIntegrationExactlyOnceTest.java:90) > at > java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at >
[jira] [Assigned] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-16570: - Assignee: Edoardo Comar (was: Justine Olshan) > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Edoardo Comar >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851117#comment-17851117 ] Edoardo Comar commented on KAFKA-16570: --- [~jolshan] I'm happy to be a reviewer if you wan to make a PR, or if you want I can take this jira > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851101#comment-17851101 ] Edoardo Comar edited comment on KAFKA-16570 at 5/31/24 2:12 PM: [~jolshan] I agree I too think that FenceProducersHandler.handleError should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or log.info it I used a simple test like {code:java} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get(); producer.commitTransaction(); admin.fenceProducers(Collections.singleton(txId)).all().get(); producer.beginTransaction(); producer.send(record).get(); //throws ProducerFenced {code} while {code:java} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get(); admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws ConcurrentTransactionsException producer.commitTransaction(); //2 {code} however if the ConcurrentTransactionsException is swallowed, then //2 throws ProducerFencedException as expected for the ... record, another producer.send().get() before commit would instead throw an InvalidProducerEpochException was (Author: ecomar): [~jolshan] I agree I too think that FenceProducersHandler.handleError should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or log.info it I used a simple test like {code:java} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get(); producer.commitTransaction(); admin.fenceProducers(Collections.singleton(txId)).all().get(); producer.beginTransaction(); producer.send(record).get(); //throws ProducerFenced {code} while {code:java} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get(); admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws ConcurrentTransactionsException producer.commitTransaction(); //2 {code} however if the ConcurrentTransactionsException is swallowed, then //2 throws ProducerFencedException as expected > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851101#comment-17851101 ] Edoardo Comar edited comment on KAFKA-16570 at 5/31/24 2:12 PM: [~jolshan] I agree I too think that FenceProducersHandler.handleError should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or log.info it I used a simple test like {code:java} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get(); producer.commitTransaction(); admin.fenceProducers(Collections.singleton(txId)).all().get(); producer.beginTransaction(); producer.send(record).get(); //throws ProducerFenced {code} while {code:java} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get(); admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws ConcurrentTransactionsException producer.commitTransaction(); //2 {code} however if the ConcurrentTransactionsException is swallowed, then //2 throws ProducerFencedException as expected btw, just for the ... record, another producer.send().get() before commit would instead throw an InvalidProducerEpochException was (Author: ecomar): [~jolshan] I agree I too think that FenceProducersHandler.handleError should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or log.info it I used a simple test like {code:java} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get(); producer.commitTransaction(); admin.fenceProducers(Collections.singleton(txId)).all().get(); producer.beginTransaction(); producer.send(record).get(); //throws ProducerFenced {code} while {code:java} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get(); admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws ConcurrentTransactionsException producer.commitTransaction(); //2 {code} however if the ConcurrentTransactionsException is swallowed, then //2 throws ProducerFencedException as expected for the ... record, another producer.send().get() before commit would instead throw an InvalidProducerEpochException > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851101#comment-17851101 ] Edoardo Comar edited comment on KAFKA-16570 at 5/31/24 2:08 PM: [~jolshan] I agree I too think that FenceProducersHandler.handleError should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or log.info it I used a simple test like {code:java} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get(); producer.commitTransaction(); admin.fenceProducers(Collections.singleton(txId)).all().get(); producer.beginTransaction(); producer.send(record).get(); //throws ProducerFenced {code} while {code:java} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get(); admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws ConcurrentTransactionsException producer.commitTransaction(); //2 {code} however if the ConcurrentTransactionsException is swallowed, then //2 throws ProducerFencedException as expected was (Author: ecomar): [~jolshan] I agree I too think that FenceProducersHandler.handleError should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or log.info it I used a simple test like {code} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get()); producer.commitTransaction(); admin.fenceProducers(Collections.singleton(txId)).all().get(); producer.beginTransaction(); producer.send(record).get(); //throws ProducerFenced {code} while {code} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get()); admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws ConcurrentTransactionsException producer.commitTransaction(); //2 {code} however if the ConcurrentTransactionsException is swallowed, then //2 throws ProducerFencedException as expected > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851101#comment-17851101 ] Edoardo Comar commented on KAFKA-16570: --- [~jolshan] I agree I too think that FenceProducersHandler.handleError should handle the CONCURRENT_TRANSACTIONS as a success and maybe log.debug or log.info it I used a simple test like {code} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get()); producer.commitTransaction(); admin.fenceProducers(Collections.singleton(txId)).all().get(); producer.beginTransaction(); producer.send(record).get(); //throws ProducerFenced {code} while {code} producer.initTransactions(); producer.beginTransaction(); producer.send(record).get()); admin.fenceProducers(Collections.singleton(txId)).all().get(); //throws ConcurrentTransactionsException producer.commitTransaction(); //2 {code} however if the ConcurrentTransactionsException is swallowed, then //2 throws ProducerFencedException as expected > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851019#comment-17851019 ] Edoardo Comar edited comment on KAFKA-16047 at 5/31/24 10:02 AM: - [~gharris1727] please see [https://github.com/apache/kafka/pull/16151] This works IMHO as a hotfix that requires no KIP. Arguably all appends to the transaction log could use this timeout, rather than the transaction timeout which has a completely different semantic, tied to the transaction not to the appending of a record and its replication. However I think that such a change may require a KIP and may have a much larger impact than just fixing Admin.fenceProducers and distributed Connect was (Author: ecomar): [~gharris1727] please see [https://github.com/apache/kafka/pull/16151] This works IMHO as a hotfix that requires no KIP. Arguably all appends to the transaction log could use this timeout, rather than the transaction timeout which has a completely different semantic, tied to the transaction not to the appending of a record and its replication. However I think that such a change requires a KIP and may have a much larger impact than just fixing Admin.fenceProducers and distributed Connect > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing all the tasks & the whole connector > --- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 >Reporter: Angelos Kaltsikis >Assignee: Edoardo Comar >Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests from the admin client timeout. > In the case of MirrorSourceConnector, which was the source connector that i > found the bug, the bug was effectively making all the tasks (in the specific > case of) become "FAILED". As soon as one of the tasks gets FAILED due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific).
[jira] [Commented] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851021#comment-17851021 ] Edoardo Comar commented on KAFKA-16047: --- ^ cc [~ChrisEgerton] [~cegerton] > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing all the tasks & the whole connector > --- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 >Reporter: Angelos Kaltsikis >Assignee: Edoardo Comar >Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests from the admin client timeout. > In the case of MirrorSourceConnector, which was the source connector that i > found the bug, the bug was effectively making all the tasks (in the specific > case of) become "FAILED". As soon as one of the tasks gets FAILED due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific). > h3. More context & logs > *Connector Logs* > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: fenceProducer(api=INIT_PRODUCER_ID) > {code} > *Broker Logs* > {code:java} > [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=] Returning > COORDINATOR_NOT_AVAILABLE error code to client for > kafka-connect-uat-mm2-msc-20th-7's InitProducerId request > (kafka.coordinator.transaction.TransactionCoordinator) > [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: > TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for > TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, > lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed > due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), > aborting state transition and
[jira] [Commented] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851019#comment-17851019 ] Edoardo Comar commented on KAFKA-16047: --- [~gharris1727] please see [https://github.com/apache/kafka/pull/16151] This works IMHO as a hotfix that requires no KIP. Arguably all appends to the transaction log could use this timeout, rather than the transaction timeout which has a completely different semantic, tied to the transaction not to the appending of a record and its replication. However I think that such a change requires a KIP and may have a much larger impact than just fixing Admin.fenceProducers and distributed Connect > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing all the tasks & the whole connector > --- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 >Reporter: Angelos Kaltsikis >Assignee: Edoardo Comar >Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests from the admin client timeout. > In the case of MirrorSourceConnector, which was the source connector that i > found the bug, the bug was effectively making all the tasks (in the specific > case of) become "FAILED". As soon as one of the tasks gets FAILED due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific). > h3. More context & logs > *Connector Logs* > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: fenceProducer(api=INIT_PRODUCER_ID) > {code} > *Broker Logs* > {code:java} > [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=] Returning > COORDINATOR_NOT_AVAILABLE error code to client for > kafka-connect-uat-mm2-msc-20th-7's InitProducerId request > (kafka.coordinator.transaction.TransactionCoordinator) > [2023-12-12 14:28:18,030] INFO
[jira] [Commented] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850864#comment-17850864 ] Edoardo Comar commented on KAFKA-16047: --- [~gharris1727] I see that for a common produce request, ReplicaManager.appendRecords uses the timeout set as {color:#00}ProducerConfig{color}.{color:#871094}REQUEST_TIMEOUT_MS_CONFIG{color} i.e. {color:#00}CommonClientConfigs.{color}{color:#871094}REQUEST_TIMEOUT_MS_CONFIG{color} would it not make sense to always use that timeout when appending to a log ? BTW, MirrorMaker2 is unusable in a connect cluster set up with exactly once, when the broker server.properties are changed from the settings used in development testing and present in the config properties files. {color:#ff}transaction.state.log.replication.factor{color}{color:#00}=1{color} {color:#ff}transaction.state.log.min.isr{color}{color:#00}=1{color} [~akaltsikis] I see that you could not progress your PR. Are you happy to hand over this bugfix? > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing all the tasks & the whole connector > --- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 >Reporter: Angelos Kaltsikis >Assignee: Edoardo Comar >Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests from the admin client timeout. > In the case of MirrorSourceConnector, which was the source connector that i > found the bug, the bug was effectively making all the tasks (in the specific > case of) become "FAILED". As soon as one of the tasks gets FAILED due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific). > h3. More context & logs > *Connector Logs* > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call:
[jira] [Assigned] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector
[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-16047: - Assignee: Edoardo Comar > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing all the tasks & the whole connector > --- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 >Reporter: Angelos Kaltsikis >Assignee: Edoardo Comar >Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests from the admin client timeout. > In the case of MirrorSourceConnector, which was the source connector that i > found the bug, the bug was effectively making all the tasks (in the specific > case of) become "FAILED". As soon as one of the tasks gets FAILED due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific). > h3. More context & logs > *Connector Logs* > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: fenceProducer(api=INIT_PRODUCER_ID) > {code} > *Broker Logs* > {code:java} > [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=] Returning > COORDINATOR_NOT_AVAILABLE error code to client for > kafka-connect-uat-mm2-msc-20th-7's InitProducerId request > (kafka.coordinator.transaction.TransactionCoordinator) > [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: > TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for > TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, > lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed > due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), > aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the >
[jira] [Commented] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
[ https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849391#comment-17849391 ] Edoardo Comar commented on KAFKA-15905: --- backported to 3.7.1 > Restarts of MirrorCheckpointTask should not permanently interrupt offset > translation > > > Key: KAFKA-15905 > URL: https://issues.apache.org/jira/browse/KAFKA-15905 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.7.1, 3.8 > > > Executive summary: When the MirrorCheckpointTask restarts, it loses the state > of checkpointsPerConsumerGroup, which limits offset translation to records > mirrored after the latest restart. > For example, if 1000 records are mirrored and the OffsetSyncs are read by > MirrorCheckpointTask, the emitted checkpoints are cached, and translation can > happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more > records are mirrored, translation can happen at the ~1500th record, but no > longer at the ~500th record. > Context: > Before KAFKA-13659, MM2 made translation decisions based on the > incompletely-initialized OffsetSyncStore, and the checkpoint could appear to > go backwards temporarily during restarts. To fix this, we forced the > OffsetSyncStore to initialize completely before translation could take place, > ensuring that the latest OffsetSync had been read, and thus providing the > most accurate translation. > Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. > Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to > allow for translation of earlier offsets. This came with the caveat that the > cache's sparseness allowed translations to go backwards permanently. To > prevent this behavior, a cache of the latest Checkpoints was kept in the > MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset > translation remained restricted to the fully-initialized OffsetSyncStore. > Effectively, the MirrorCheckpointTask ensures that it translates based on an > OffsetSync emitted during it's lifetime, to ensure that no previous > MirrorCheckpointTask emitted a later sync. If we can read the checkpoints > emitted by previous generations of MirrorCheckpointTask, we can still ensure > that checkpoints are monotonic, while allowing translation further back in > history. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849390#comment-17849390 ] Edoardo Comar commented on KAFKA-16622: --- backported to 3.7.1 > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.7.1, 3.8 > > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
[ https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reopened KAFKA-15905: --- reopening for backporting to 3.7.1 to be confermed > Restarts of MirrorCheckpointTask should not permanently interrupt offset > translation > > > Key: KAFKA-15905 > URL: https://issues.apache.org/jira/browse/KAFKA-15905 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.7.1, 3.8 > > > Executive summary: When the MirrorCheckpointTask restarts, it loses the state > of checkpointsPerConsumerGroup, which limits offset translation to records > mirrored after the latest restart. > For example, if 1000 records are mirrored and the OffsetSyncs are read by > MirrorCheckpointTask, the emitted checkpoints are cached, and translation can > happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more > records are mirrored, translation can happen at the ~1500th record, but no > longer at the ~500th record. > Context: > Before KAFKA-13659, MM2 made translation decisions based on the > incompletely-initialized OffsetSyncStore, and the checkpoint could appear to > go backwards temporarily during restarts. To fix this, we forced the > OffsetSyncStore to initialize completely before translation could take place, > ensuring that the latest OffsetSync had been read, and thus providing the > most accurate translation. > Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. > Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to > allow for translation of earlier offsets. This came with the caveat that the > cache's sparseness allowed translations to go backwards permanently. To > prevent this behavior, a cache of the latest Checkpoints was kept in the > MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset > translation remained restricted to the fully-initialized OffsetSyncStore. > Effectively, the MirrorCheckpointTask ensures that it translates based on an > OffsetSync emitted during it's lifetime, to ensure that no previous > MirrorCheckpointTask emitted a later sync. If we can read the checkpoints > emitted by previous generations of MirrorCheckpointTask, we can still ensure > that checkpoints are monotonic, while allowing translation further back in > history. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
[ https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-15905: -- Fix Version/s: 3.7.1 > Restarts of MirrorCheckpointTask should not permanently interrupt offset > translation > > > Key: KAFKA-15905 > URL: https://issues.apache.org/jira/browse/KAFKA-15905 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.7.1, 3.8 > > > Executive summary: When the MirrorCheckpointTask restarts, it loses the state > of checkpointsPerConsumerGroup, which limits offset translation to records > mirrored after the latest restart. > For example, if 1000 records are mirrored and the OffsetSyncs are read by > MirrorCheckpointTask, the emitted checkpoints are cached, and translation can > happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more > records are mirrored, translation can happen at the ~1500th record, but no > longer at the ~500th record. > Context: > Before KAFKA-13659, MM2 made translation decisions based on the > incompletely-initialized OffsetSyncStore, and the checkpoint could appear to > go backwards temporarily during restarts. To fix this, we forced the > OffsetSyncStore to initialize completely before translation could take place, > ensuring that the latest OffsetSync had been read, and thus providing the > most accurate translation. > Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. > Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to > allow for translation of earlier offsets. This came with the caveat that the > cache's sparseness allowed translations to go backwards permanently. To > prevent this behavior, a cache of the latest Checkpoints was kept in the > MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset > translation remained restricted to the fully-initialized OffsetSyncStore. > Effectively, the MirrorCheckpointTask ensures that it translates based on an > OffsetSync emitted during it's lifetime, to ensure that no previous > MirrorCheckpointTask emitted a later sync. If we can read the checkpoints > emitted by previous generations of MirrorCheckpointTask, we can still ensure > that checkpoints are monotonic, while allowing translation further back in > history. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16641) MM2 offset translation should interpolate between sparse OffsetSyncs
[ https://issues.apache.org/jira/browse/KAFKA-16641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848563#comment-17848563 ] Edoardo Comar commented on KAFKA-16641: --- [~gharris] what pathological scenarios do you think of ? frequent aborted transactions on a source topic? source topic frequently deleted / recreated with same name ? > MM2 offset translation should interpolate between sparse OffsetSyncs > > > Key: KAFKA-16641 > URL: https://issues.apache.org/jira/browse/KAFKA-16641 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Greg Harris >Priority: Major > > Right now, the OffsetSyncStore keeps a sparse offset store, with exponential > spacing between syncs. This can leave large gaps in translation, where > offsets are translated much more conservatively than necessary. > The dominant way to use MirrorMaker2 is in a "single writer" fashion, where > the target topic is only written to by a single mirror maker 2. When a topic > without gaps is replicated, contiguous blocks of offsets are preserved. For > example: > Say that MM2 mirrors 100 records, and emits two syncs: 0:100 and 100:200. We > can detect when the gap between the upstream and downstream offsets is the > same using subtraction, and then assume that 50:150 is also a valid > translation. If the source topic has gaps, or goes through a restart, we > should expect a discontinuity in the offset syncs, like 0:100 and 100:250 or > 0:100 and 100:150. > This may allow us to restore much of the offset translation precision that > was lost for simple contiguous topics, without additional memory usage, but > at the risk of mis-translating some pathological situations when the source > topic has gaps. This might be able to be enabled unconditionally, or enabled > via a configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
[ https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-15905: -- Fix Version/s: 3.7.1 3.8 (was: 3.80) (was: 3.71) > Restarts of MirrorCheckpointTask should not permanently interrupt offset > translation > > > Key: KAFKA-15905 > URL: https://issues.apache.org/jira/browse/KAFKA-15905 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.7.1, 3.8 > > > Executive summary: When the MirrorCheckpointTask restarts, it loses the state > of checkpointsPerConsumerGroup, which limits offset translation to records > mirrored after the latest restart. > For example, if 1000 records are mirrored and the OffsetSyncs are read by > MirrorCheckpointTask, the emitted checkpoints are cached, and translation can > happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more > records are mirrored, translation can happen at the ~1500th record, but no > longer at the ~500th record. > Context: > Before KAFKA-13659, MM2 made translation decisions based on the > incompletely-initialized OffsetSyncStore, and the checkpoint could appear to > go backwards temporarily during restarts. To fix this, we forced the > OffsetSyncStore to initialize completely before translation could take place, > ensuring that the latest OffsetSync had been read, and thus providing the > most accurate translation. > Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. > Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to > allow for translation of earlier offsets. This came with the caveat that the > cache's sparseness allowed translations to go backwards permanently. To > prevent this behavior, a cache of the latest Checkpoints was kept in the > MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset > translation remained restricted to the fully-initialized OffsetSyncStore. > Effectively, the MirrorCheckpointTask ensures that it translates based on an > OffsetSync emitted during it's lifetime, to ensure that no previous > MirrorCheckpointTask emitted a later sync. If we can read the checkpoints > emitted by previous generations of MirrorCheckpointTask, we can still ensure > that checkpoints are monotonic, while allowing translation further back in > history. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16622: -- Fix Version/s: 3.7.1 3.8 > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.7.1, 3.8 > > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
[ https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-15905: -- Fix Version/s: 3.80 3.71 > Restarts of MirrorCheckpointTask should not permanently interrupt offset > translation > > > Key: KAFKA-15905 > URL: https://issues.apache.org/jira/browse/KAFKA-15905 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.80, 3.71 > > > Executive summary: When the MirrorCheckpointTask restarts, it loses the state > of checkpointsPerConsumerGroup, which limits offset translation to records > mirrored after the latest restart. > For example, if 1000 records are mirrored and the OffsetSyncs are read by > MirrorCheckpointTask, the emitted checkpoints are cached, and translation can > happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more > records are mirrored, translation can happen at the ~1500th record, but no > longer at the ~500th record. > Context: > Before KAFKA-13659, MM2 made translation decisions based on the > incompletely-initialized OffsetSyncStore, and the checkpoint could appear to > go backwards temporarily during restarts. To fix this, we forced the > OffsetSyncStore to initialize completely before translation could take place, > ensuring that the latest OffsetSync had been read, and thus providing the > most accurate translation. > Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. > Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to > allow for translation of earlier offsets. This came with the caveat that the > cache's sparseness allowed translations to go backwards permanently. To > prevent this behavior, a cache of the latest Checkpoints was kept in the > MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset > translation remained restricted to the fully-initialized OffsetSyncStore. > Effectively, the MirrorCheckpointTask ensures that it translates based on an > OffsetSync emitted during it's lifetime, to ensure that no previous > MirrorCheckpointTask emitted a later sync. If we can read the checkpoints > emitted by previous generations of MirrorCheckpointTask, we can still ensure > that checkpoints are monotonic, while allowing translation further back in > history. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
[ https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848520#comment-17848520 ] Edoardo Comar commented on KAFKA-15905: --- [https://github.com/apache/kafka/pull/15910] fixes both this issue https://issues.apache.org/jira/browse/KAFKA-15905 and the related https://issues.apache.org/jira/browse/KAFKA-16622 we could backport the fix to 3.7.1 > Restarts of MirrorCheckpointTask should not permanently interrupt offset > translation > > > Key: KAFKA-15905 > URL: https://issues.apache.org/jira/browse/KAFKA-15905 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Edoardo Comar >Priority: Major > > Executive summary: When the MirrorCheckpointTask restarts, it loses the state > of checkpointsPerConsumerGroup, which limits offset translation to records > mirrored after the latest restart. > For example, if 1000 records are mirrored and the OffsetSyncs are read by > MirrorCheckpointTask, the emitted checkpoints are cached, and translation can > happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more > records are mirrored, translation can happen at the ~1500th record, but no > longer at the ~500th record. > Context: > Before KAFKA-13659, MM2 made translation decisions based on the > incompletely-initialized OffsetSyncStore, and the checkpoint could appear to > go backwards temporarily during restarts. To fix this, we forced the > OffsetSyncStore to initialize completely before translation could take place, > ensuring that the latest OffsetSync had been read, and thus providing the > most accurate translation. > Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. > Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to > allow for translation of earlier offsets. This came with the caveat that the > cache's sparseness allowed translations to go backwards permanently. To > prevent this behavior, a cache of the latest Checkpoints was kept in the > MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset > translation remained restricted to the fully-initialized OffsetSyncStore. > Effectively, the MirrorCheckpointTask ensures that it translates based on an > OffsetSync emitted during it's lifetime, to ensure that no previous > MirrorCheckpointTask emitted a later sync. If we can read the checkpoints > emitted by previous generations of MirrorCheckpointTask, we can still ensure > that checkpoints are monotonic, while allowing translation further back in > history. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848519#comment-17848519 ] Edoardo Comar commented on KAFKA-16622: --- [https://github.com/apache/kafka/pull/15910] fixes both this https://issues.apache.org/jira/browse/KAFKA-16622 and https://issues.apache.org/jira/browse/KAFKA-15905 we could backport the fix to 3.7.1 > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-16622: - Assignee: Edoardo Comar > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
[ https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-15905: - Assignee: Edoardo Comar > Restarts of MirrorCheckpointTask should not permanently interrupt offset > translation > > > Key: KAFKA-15905 > URL: https://issues.apache.org/jira/browse/KAFKA-15905 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Edoardo Comar >Priority: Major > > Executive summary: When the MirrorCheckpointTask restarts, it loses the state > of checkpointsPerConsumerGroup, which limits offset translation to records > mirrored after the latest restart. > For example, if 1000 records are mirrored and the OffsetSyncs are read by > MirrorCheckpointTask, the emitted checkpoints are cached, and translation can > happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more > records are mirrored, translation can happen at the ~1500th record, but no > longer at the ~500th record. > Context: > Before KAFKA-13659, MM2 made translation decisions based on the > incompletely-initialized OffsetSyncStore, and the checkpoint could appear to > go backwards temporarily during restarts. To fix this, we forced the > OffsetSyncStore to initialize completely before translation could take place, > ensuring that the latest OffsetSync had been read, and thus providing the > most accurate translation. > Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. > Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to > allow for translation of earlier offsets. This came with the caveat that the > cache's sparseness allowed translations to go backwards permanently. To > prevent this behavior, a cache of the latest Checkpoints was kept in the > MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset > translation remained restricted to the fully-initialized OffsetSyncStore. > Effectively, the MirrorCheckpointTask ensures that it translates based on an > OffsetSync emitted during it's lifetime, to ensure that no previous > MirrorCheckpointTask emitted a later sync. If we can read the checkpoints > emitted by previous generations of MirrorCheckpointTask, we can still ensure > that checkpoints are monotonic, while allowing translation further back in > history. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841927#comment-17841927 ] Edoardo Comar commented on KAFKA-16622: --- related issue https://issues.apache.org/jira/browse/KAFKA-16364 > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841137#comment-17841137 ] Edoardo Comar edited comment on KAFKA-16622 at 4/27/24 10:55 AM: - if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. but the problem here is that if the consumer never fully catches up once, we will never have a checkpoint. If as initial state the {color:#00}OffsetSyncStore.{color}{color:#871094}offsetSyncs {color}contained a distribution of {color:#00}OffsetSync rather than just multiple copies of the last {color}{color:#00}OffsetSync , Checkpoints would be computed earlier I think{color} {color:#00} {color} was (Author: ecomar): if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. but the problem here is that if the consumer never fully catches up once, we will never have a checkpoint. If the {color:#00}OffsetSyncStore.{color}{color:#871094}offsetSyncs {color}contained a distribution of {color:#00}OffsetSync rather than just multiple copies of the last {color}{color:#00}OffsetSync , Checkpoints would be computed before {color} {color:#00} {color} > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841137#comment-17841137 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 6:04 PM: if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. but the problem here is that if the consumer never fully catches up once, we will never have a checkpoint. If the {color:#00}OffsetSyncStore.{color}{color:#871094}offsetSyncs {color}contained a distribution of {color:#00}OffsetSync rather than just multiple copies of the last {color}{color:#00}OffsetSync , Checkpoints would be computed before {color} {color:#00} {color} was (Author: ecomar): if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. but the problem here is that if the consumer never fully cathces up once, we will never have a checkpoint > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841137#comment-17841137 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:40 AM: - if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. but the problem here is that if the consumer never fully cathces up once, we will never have a checkpoint was (Author: ecomar): if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841137#comment-17841137 ] Edoardo Comar commented on KAFKA-16622: --- if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:18 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{color:#00}clientProps{color}.put({color:#00}ConsumerConfig{color}.{color:#871094}MAX_POLL_RECORDS_CONFIG{color}, {color:#067d17}"100"{color});}} {{{color:#0033b3}... while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00} ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00} polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color}, {color:#00}consumer{color});}} {{{color:#00} consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}} {{{color:#00} Thread{color}.sleep({color:#1750eb}1000L{color});}} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=1, {color}{color:#ff}downstreamOffset{color}{color:#00}=1, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=16700, {color}{color:#ff}downstreamOffset{color}{color:#00}=16501, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=18200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18096, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18965, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19700, {color}{color:#ff}downstreamOffset{color}{color:#00}=19636, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=2, {color}{color:#ff}downstreamOffset{color}{color:#00}=2, {color}{color:#ff}metadata{color}{color:#00}=}{color} [^connect.log.2024-04-26-10.zip] was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{color:#00}clientProps{color}.put({color:#00}ConsumerConfig{color}.{color:#871094}MAX_POLL_RECORDS_CONFIG{color}, {color:#067d17}"100"{color});}} {{{color:#0033b3}... while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00} ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}=
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:17 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color}, {color:#00}consumer{color});}} {{{color:#00}consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}} {{{color:#00}Thread{color}.sleep({color:#1750eb}1000L{color});}} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=1, {color}{color:#ff}downstreamOffset{color}{color:#00}=1, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=16700, {color}{color:#ff}downstreamOffset{color}{color:#00}=16501, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=18200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18096, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18965, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19700, {color}{color:#ff}downstreamOffset{color}{color:#00}=19636, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=2, {color}{color:#ff}downstreamOffset{color}{color:#00}=2, {color}{color:#ff}metadata{color}{color:#00}=}{color} [^connect.log.2024-04-26-10.zip]{}}} was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{}} {{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color}, {color:#00}consumer{color});}} {{{color:#00}consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}}
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:17 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{color:#00}clientProps{color}.put({color:#00}ConsumerConfig{color}.{color:#871094}MAX_POLL_RECORDS_CONFIG{color}, {color:#067d17}"100"{color});}} {{{color:#0033b3}... while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00} ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00} polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color}, {color:#00}consumer{color});}} {{{color:#00} consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}} {{{color:#00} Thread{color}.sleep({color:#1750eb}1000L{color});}} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=1, {color}{color:#ff}downstreamOffset{color}{color:#00}=1, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=16700, {color}{color:#ff}downstreamOffset{color}{color:#00}=16501, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=18200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18096, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18965, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19700, {color}{color:#ff}downstreamOffset{color}{color:#00}=19636, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=2, {color}{color:#ff}downstreamOffset{color}{color:#00}=2, {color}{color:#ff}metadata{color}{color:#00}=}{color} [^connect.log.2024-04-26-10.zip]{}}} was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color},
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:16 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{}} {{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color}, {color:#00}consumer{color});}} {{{color:#00}consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}} {{{color:#00}Thread{color}.sleep({color:#1750eb}1000L{color});}} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=1, {color}{color:#ff}downstreamOffset{color}{color:#00}=1, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=16700, {color}{color:#ff}downstreamOffset{color}{color:#00}=16501, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=18200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18096, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18965, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19700, {color}{color:#ff}downstreamOffset{color}{color:#00}=19636, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=2, {color}{color:#ff}downstreamOffset{color}{color:#00}=2, {color}{color:#ff}metadata{color}{color:#00}=}{color} [^connect.log.2024-04-26-10.zip]{}}} was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{}while (...){}}}{{{}{ {}}} {{ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); }} {{// print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); }} {{Thread.sleep(1000L); }} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700,
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:15 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{}while (...){}}}{{{}{ {}}} {{ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); }} {{// print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); }} {{Thread.sleep(1000L); }} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=}{}}}{{{}{}}}{{{} [^connect.log.2024-04-26-10.zip]{}}} was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } }} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {{ Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=} }} [^connect.log.2024-04-26-10.zip] > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:13 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } }} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {{ Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=} }} [^connect.log.2024-04-26-10.zip] was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. ``` {{ while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } }}``` the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have ``` {{Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=}}} ``` [^connect.log.2024-04-26-10.zip] > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:12 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. ``` {{ while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } }}``` the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have ``` {{Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=}}} ``` [^connect.log.2024-04-26-10.zip] was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. ``` while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } ``` the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have ``` Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=} ``` [^connect.log.2024-04-26-10.zip] > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh
[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar commented on KAFKA-16622: --- Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. ``` while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } ``` the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have ``` Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=} ``` [^connect.log.2024-04-26-10.zip] > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16622: -- Attachment: connect.log.2024-04-26-10.zip > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840966#comment-17840966 ] Edoardo Comar commented on KAFKA-16622: --- Activating DEBUG logging ``` [2024-04-25 21:20:10,856] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(mygroup1,mytopic-0,13805): Skipped (OffsetSync{topicPartition=mytopic-0, upstreamOffset=1, downstreamOffset=1} is ahead of upstream consumer group 13805) (org.apache.kafka.connect.mirror.OffsetSyncStore:125) ``` The checkpoint is not emitted because the topic-partition has been mirrorred further than where the consumer group is, so until the group catches up no checkpoints will be emitted. Question for [~gregharris73] this behavior would mean that any consumers in groups that are behind the log end that are switched from consuming from source cluster to the target cluster to reprocess the entire partition ? They would have access to no translated offsets. > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
Edoardo Comar created KAFKA-16622: - Summary: Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once Key: KAFKA-16622 URL: https://issues.apache.org/jira/browse/KAFKA-16622 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.6.2, 3.7.0, 3.8.0 Reporter: Edoardo Comar Attachments: edo-connect-mirror-maker-sourcetarget.properties We observed an excessively delayed emission of the MM2 Checkpoint record. It only gets created when the source consumer reaches the end of a topic. This does not seem reasonable. In a very simple setup : Tested with a standalone single process MirrorMaker2 mirroring between two single-node kafka clusters(mirromaker config attached) with quick refresh intervals (eg 5 sec) and a small offset.lag.max (eg 10) create a single topic in the source cluster produce data to it (e.g. 1 records) start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec between polls which commits after each poll watch the Checkpoint topic in the target cluster bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ --topic source.checkpoints.internal \ --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ --from-beginning -> no record appears in the checkpoint topic until the consumer reaches the end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828067#comment-17828067 ] Edoardo Comar commented on KAFKA-16369: --- fix cherry picked to 3.6 and 3.7 > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.6.2, 3.8.0, 3.7.1 > > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827976#comment-17827976 ] Edoardo Comar commented on KAFKA-16369: --- fix merged in trunk thanks [~showuon] > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.6.2, 3.8.0, 3.7.1 > > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16369: -- Fix Version/s: 3.8.0 > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.8.0 > > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16369: -- Fix Version/s: 3.6.2 3.7.1 > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.6.2, 3.8.0, 3.7.1 > > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827107#comment-17827107 ] Edoardo Comar commented on KAFKA-16369: --- PR is ready for review - can anyone take a look please ? > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16369: -- Affects Version/s: 3.6.1 3.7.0 3.8.0 > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826827#comment-17826827 ] Edoardo Comar commented on KAFKA-16369: --- KafkaServer (ZKMode) needs to wait on the future returned by SocketServer.enableRequestProcessing similarly to the BrokerServer (KRaft mode). PR in progress > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-16369: - Assignee: Edoardo Comar > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-16369: -- Attachment: server.log kraft-server.log > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Reporter: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826811#comment-17826811 ] Edoardo Comar commented on KAFKA-16369: --- server logs as broker only (zk mode) and broker/controller (kraft mode) when port 9092 is already bound [^server.log][^kraft-server.log] > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Reporter: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
[ https://issues.apache.org/jira/browse/KAFKA-16369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826811#comment-17826811 ] Edoardo Comar edited comment on KAFKA-16369 at 3/13/24 5:50 PM: server logs as broker only (zk mode) and broker/controller (kraft mode) when port 9092 is already bound [^server.log] [^kraft-server.log] was (Author: ecomar): server logs as broker only (zk mode) and broker/controller (kraft mode) when port 9092 is already bound [^server.log][^kraft-server.log] > Broker may not shut down when SocketServer fails to bind as Address already > in use > -- > > Key: KAFKA-16369 > URL: https://issues.apache.org/jira/browse/KAFKA-16369 > Project: Kafka > Issue Type: Bug >Reporter: Edoardo Comar >Priority: Major > Attachments: kraft-server.log, server.log > > > When in Zookeeper mode, if a port the broker should listen to is already bound > the KafkaException: Socket server failed to bind to localhost:9092: Address > already in use. > is thrown but the Broker continues to startup . > It correctly shuts down when in KRaft mode. > Easy to reproduce when in Zookeper mode with server.config set to listen to > localhost only > {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16369) Broker may not shut down when SocketServer fails to bind as Address already in use
Edoardo Comar created KAFKA-16369: - Summary: Broker may not shut down when SocketServer fails to bind as Address already in use Key: KAFKA-16369 URL: https://issues.apache.org/jira/browse/KAFKA-16369 Project: Kafka Issue Type: Bug Reporter: Edoardo Comar When in Zookeeper mode, if a port the broker should listen to is already bound the KafkaException: Socket server failed to bind to localhost:9092: Address already in use. is thrown but the Broker continues to startup . It correctly shuts down when in KRaft mode. Easy to reproduce when in Zookeper mode with server.config set to listen to localhost only {color:#00}listeners={color}{color:#a31515}PLAINTEXT://localhost:9092{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740748#comment-17740748 ] Edoardo Comar edited comment on KAFKA-15144 at 7/6/23 8:56 PM: --- I'll rather rewrite our test expecting the record to be found consuming no more than offset.lag.max=100 records from the target cluster, starting from the last checkpoint was (Author: ecomar): I'll rather rewrite our test expecting the record to be found consuming no more than offset.lag.max=100 records from the target cluster > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740748#comment-17740748 ] Edoardo Comar commented on KAFKA-15144: --- I'll rather rewrite our test expecting the record to be found consuming no more than offset.lag.max=100 records from the target cluster > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740550#comment-17740550 ] Edoardo Comar edited comment on KAFKA-15144 at 7/6/23 11:03 AM: Thanks [~gharris1727] I had a good look at the code and now it's all clear. Except that we could not find very good docs on how to consume the Checkpoints ... so in the past one of our tests was written expecting to find the exact match for upstream/downsteram because that was the externally visible behavior when all wasd good. Cheers was (Author: ecomar): Thanks [~gharris1727] > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740550#comment-17740550 ] Edoardo Comar commented on KAFKA-15144: --- Thanks [~gharris1727] > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar resolved KAFKA-15144. --- Resolution: Not A Bug Closing as not a bug. The "problem" arose as without config changes, by updating MM2 from 3.3.2 to a later release the observable content of the Checkpoint topic has changed considerably. In 3.3.2 even without new records in the OffsetSync topic, the Checkpoint records were advancing often (and even contain many duplicates). Now gaps of up to offset.lag.max must be expected and more reprocessing of records downstream may occur > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739883#comment-17739883 ] Edoardo Comar edited comment on KAFKA-15144 at 7/4/23 12:12 PM: again, with {color:#00}offset.lag.max={color}{color:#a31515}100{color} producing at irregular rate with a console producer, the checkpoints are unexpected to me OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=101, downstreamOffset=101} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=202, downstreamOffset=202} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=118, downstreamOffset=102, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=227, downstreamOffset=203, metadata=} so a consumer using Checkpoint switching from source to target is guaranteed not to miss any messages but may reprocess quite a few, correct ? was (Author: ecomar): again, with {color:#00}offset.lag.max={color}{color:#a31515}100{color} producing at irregular rate with a console producer, the checkpoints are unexpected to me OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=101, downstreamOffset=101} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=202, downstreamOffset=202} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=118, downstreamOffset=102, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=227, downstreamOffset=203, metadata=} > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739883#comment-17739883 ] Edoardo Comar commented on KAFKA-15144: --- again, with {color:#00}offset.lag.max={color}{color:#a31515}100{color} producing at irregular rate with a console producer, the checkpoints are unexpected to me OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=101, downstreamOffset=101} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=202, downstreamOffset=202} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=118, downstreamOffset=102, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=227, downstreamOffset=203, metadata=} > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739868#comment-17739868 ] Edoardo Comar edited comment on KAFKA-15144 at 7/4/23 10:32 AM: Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=6, downstreamOffset=6} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=8, downstreamOffset=8} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=10, downstreamOffset=10} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=12, downstreamOffset=12} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=14, downstreamOffset=14} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=16, downstreamOffset=16} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=18, downstreamOffset=18} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=20, downstreamOffset=20} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=22, downstreamOffset=22} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=24, downstreamOffset=24} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=26, downstreamOffset=26} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=28, downstreamOffset=28} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=30, downstreamOffset=30} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=32, downstreamOffset=32} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=34, downstreamOffset=34} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=36, downstreamOffset=36} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=38, downstreamOffset=38} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=40, downstreamOffset=40} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=42, downstreamOffset=42} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=44, downstreamOffset=44} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=46, downstreamOffset=46} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=48, downstreamOffset=48} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=50, downstreamOffset=50} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=52, downstreamOffset=52} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=54, downstreamOffset=54} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=56, downstreamOffset=56} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, downstreamOffset=3, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=5, downstreamOffset=5, metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=30, downstreamOffset=29{*}, metadata=} Checkpoint {consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, *upstreamOffset=52, downstreamOffset=51,* metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=57, downstreamOffset=57, metadata=} I'd expect the upstreamOffset to be always the same as the upstreamOffset there are no transactions, and the topic has been created fresh with MM2 running the discrepancy observed above is not present if {color:#00}offset.lag.max={color}{color:#a31515}0 {color}but that comes at the cost of emitting offsets sysncs was (Author: ecomar): Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739868#comment-17739868 ] Edoardo Comar edited comment on KAFKA-15144 at 7/4/23 10:32 AM: Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=6, downstreamOffset=6} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=8, downstreamOffset=8} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=10, downstreamOffset=10} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=12, downstreamOffset=12} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=14, downstreamOffset=14} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=16, downstreamOffset=16} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=18, downstreamOffset=18} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=20, downstreamOffset=20} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=22, downstreamOffset=22} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=24, downstreamOffset=24} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=26, downstreamOffset=26} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=28, downstreamOffset=28} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=30, downstreamOffset=30} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=32, downstreamOffset=32} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=34, downstreamOffset=34} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=36, downstreamOffset=36} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=38, downstreamOffset=38} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=40, downstreamOffset=40} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=42, downstreamOffset=42} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=44, downstreamOffset=44} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=46, downstreamOffset=46} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=48, downstreamOffset=48} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=50, downstreamOffset=50} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=52, downstreamOffset=52} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=54, downstreamOffset=54} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=56, downstreamOffset=56} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, downstreamOffset=3, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=5, downstreamOffset=5, metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=30, downstreamOffset=29{*}, metadata=} Checkpoint {consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, *upstreamOffset=52, downstreamOffset=51,* metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=57, downstreamOffset=57, metadata=} I'd expect the upstreamOffset to be always the same as the upstreamOffset there are no transactions, and the topic has been created fresh with MM2 running the discrepancy observed above is not present if {color:#00}offset.lag.max={color}{color:#a31515}0 {color:#172b4d}but that comes at the cost of emitting offsets sysncs{color}{color} was (Author: ecomar): Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0,
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739868#comment-17739868 ] Edoardo Comar edited comment on KAFKA-15144 at 7/4/23 9:51 AM: --- Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=6, downstreamOffset=6} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=8, downstreamOffset=8} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=10, downstreamOffset=10} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=12, downstreamOffset=12} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=14, downstreamOffset=14} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=16, downstreamOffset=16} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=18, downstreamOffset=18} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=20, downstreamOffset=20} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=22, downstreamOffset=22} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=24, downstreamOffset=24} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=26, downstreamOffset=26} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=28, downstreamOffset=28} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=30, downstreamOffset=30} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=32, downstreamOffset=32} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=34, downstreamOffset=34} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=36, downstreamOffset=36} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=38, downstreamOffset=38} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=40, downstreamOffset=40} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=42, downstreamOffset=42} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=44, downstreamOffset=44} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=46, downstreamOffset=46} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=48, downstreamOffset=48} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=50, downstreamOffset=50} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=52, downstreamOffset=52} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=54, downstreamOffset=54} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=56, downstreamOffset=56} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, downstreamOffset=3, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=5, downstreamOffset=5, metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=30, downstreamOffset=29{*}, metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, *upstreamOffset=52, downstreamOffset=51,* metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=57, downstreamOffset=57, metadata=} I'd expect the upstreamOffset to be always the same as the upstreamOffset there are no transactions, and the topic has been created fresh with MM2 running was (Author: ecomar): Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4}
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739868#comment-17739868 ] Edoardo Comar edited comment on KAFKA-15144 at 7/4/23 9:51 AM: --- Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=6, downstreamOffset=6} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=8, downstreamOffset=8} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=10, downstreamOffset=10} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=12, downstreamOffset=12} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=14, downstreamOffset=14} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=16, downstreamOffset=16} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=18, downstreamOffset=18} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=20, downstreamOffset=20} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=22, downstreamOffset=22} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=24, downstreamOffset=24} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=26, downstreamOffset=26} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=28, downstreamOffset=28} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=30, downstreamOffset=30} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=32, downstreamOffset=32} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=34, downstreamOffset=34} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=36, downstreamOffset=36} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=38, downstreamOffset=38} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=40, downstreamOffset=40} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=42, downstreamOffset=42} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=44, downstreamOffset=44} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=46, downstreamOffset=46} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=48, downstreamOffset=48} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=50, downstreamOffset=50} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=52, downstreamOffset=52} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=54, downstreamOffset=54} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=56, downstreamOffset=56} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, downstreamOffset=3, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=5, downstreamOffset=5, metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=30, downstreamOffset=29{*}, metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, *upstreamOffset=52, downstreamOffset=51,* metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=57, downstreamOffset=57, metadata=} I'd expect the upstreamOffset to be always the same as the upstreamOffset there are no transactions, and the topic has been created fresh with MM2 running was (Author: ecomar): Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4}
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739868#comment-17739868 ] Edoardo Comar edited comment on KAFKA-15144 at 7/4/23 9:50 AM: --- Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=6, downstreamOffset=6} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=8, downstreamOffset=8} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=10, downstreamOffset=10} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=12, downstreamOffset=12} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=14, downstreamOffset=14} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=16, downstreamOffset=16} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=18, downstreamOffset=18} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=20, downstreamOffset=20} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=22, downstreamOffset=22} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=24, downstreamOffset=24} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=26, downstreamOffset=26} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=28, downstreamOffset=28} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=30, downstreamOffset=30} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=32, downstreamOffset=32} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=34, downstreamOffset=34} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=36, downstreamOffset=36} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=38, downstreamOffset=38} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=40, downstreamOffset=40} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=42, downstreamOffset=42} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=44, downstreamOffset=44} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=46, downstreamOffset=46} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=48, downstreamOffset=48} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=50, downstreamOffset=50} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=52, downstreamOffset=52} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=54, downstreamOffset=54} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=56, downstreamOffset=56} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, downstreamOffset=3, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=5, downstreamOffset=5, metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=30, downstreamOffset=29{*}, metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, *upstreamOffset=52, downstreamOffset=51,* metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=57, downstreamOffset=57, metadata=} I'd expect the upstreamOffset to be always the same as the upstreamOffset there are no transactions, and the topic has been created fresh with MM2 running was (Author: ecomar): Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4}
[jira] [Commented] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739868#comment-17739868 ] Edoardo Comar commented on KAFKA-15144: --- Experimenting a bit, setting {color:#00}offset.lag.max={color}{color:#a31515}1{color} and then running a console producer and consumer on the source, I still see puzzling checkpoints despite what looks a correct offset sync : OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=6, downstreamOffset=6} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=8, downstreamOffset=8} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=10, downstreamOffset=10} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=12, downstreamOffset=12} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=14, downstreamOffset=14} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=16, downstreamOffset=16} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=18, downstreamOffset=18} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=20, downstreamOffset=20} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=22, downstreamOffset=22} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=24, downstreamOffset=24} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=26, downstreamOffset=26} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=28, downstreamOffset=28} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=30, downstreamOffset=30} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=32, downstreamOffset=32} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=34, downstreamOffset=34} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=36, downstreamOffset=36} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=38, downstreamOffset=38} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=40, downstreamOffset=40} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=42, downstreamOffset=42} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=44, downstreamOffset=44} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=46, downstreamOffset=46} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=48, downstreamOffset=48} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=50, downstreamOffset=50} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=52, downstreamOffset=52} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=54, downstreamOffset=54} OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=56, downstreamOffset=56} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=2, downstreamOffset=2, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, downstreamOffset=3, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=4, downstreamOffset=4, metadata=} Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=5, downstreamOffset=5, metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, *upstreamOffset=30, downstreamOffset=29,* metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, *upstreamOffset=52, downstreamOffset=51,* metadata=} Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, *upstreamOffset=57, downstreamOffset=57,* metadata=} I'd expect the upstreamOffset to be always the same as the upstreamOffset there are no transactions, and the topic has been created fresh with MM2 running > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start
[jira] [Commented] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739867#comment-17739867 ] Edoardo Comar commented on KAFKA-15144: --- Thanks [~gharris1727] ! I had realized that the lack of sync offset records was the cause of the downstream checkpoint not advancing. I had missed is that the external behavior has changed if the configuration was kept unchanged. > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739688#comment-17739688 ] Edoardo Comar edited comment on KAFKA-15144 at 7/3/23 4:52 PM: --- producing 1 record at a time with th console producer, while a consumer is polling (on source) the MM2 logs report : {{[2023-07-03 17:44:49,479] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,0): Translated 0 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:44:49,479] TRACE [MirrorCheckpointConnector|task-0] Emitting Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0, metadata=} (first for this partition) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:204)}} {{...}} {{[2023-07-03 17:44:54,510] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,1): Translated 1 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:44:54,511] TRACE [MirrorCheckpointConnector|task-0] Emitting Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} (downstream offset advanced) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:214)}} {{...}} {{[2023-07-03 17:45:04,547] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,2): Translated 1 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:45:04,548] TRACE [MirrorCheckpointConnector|task-0] *Skipping* Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=2, downstreamOffset=1{*}, metadata=} (repeated checkpoint) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:220)}} was (Author: ecomar): producing 1 record at a time with th console producer, while a consumer is polling (on source) the MM2 logs report : {{[2023-07-03 17:44:49,479] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,0): Translated 0 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:44:49,479] TRACE [MirrorCheckpointConnector|task-0] Emitting Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0, metadata=} (first for this partition) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:204)}} {{...}} {{[2023-07-03 17:44:54,510] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,1): Translated 1 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:44:54,511] TRACE [MirrorCheckpointConnector|task-0] Emitting Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} (downstream offset advanced) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:214)}} {{...}} {{[2023-07-03 17:45:04,547] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,2): Translated 1 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:45:04,548] TRACE [MirrorCheckpointConnector|task-0] *Skipping* Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, u{*}pstreamOffset=2, downstreamOffset=1{*}, metadata=} (repeated checkpoint) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:220)}} > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic
[jira] [Commented] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739688#comment-17739688 ] Edoardo Comar commented on KAFKA-15144: --- producing 1 record at a time with th console producer, while a consumer is polling (on source) the MM2 logs report : {{[2023-07-03 17:44:49,479] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,0): Translated 0 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:44:49,479] TRACE [MirrorCheckpointConnector|task-0] Emitting Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0, metadata=} (first for this partition) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:204)}} {{...}} {{[2023-07-03 17:44:54,510] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,1): Translated 1 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:44:54,511] TRACE [MirrorCheckpointConnector|task-0] Emitting Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} (downstream offset advanced) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:214)}} {{...}} {{[2023-07-03 17:45:04,547] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,2): Translated 1 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:45:04,548] TRACE [MirrorCheckpointConnector|task-0] *Skipping* Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, u{*}pstreamOffset=2, downstreamOffset=1{*}, metadata=} (repeated checkpoint) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:220)}} > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739688#comment-17739688 ] Edoardo Comar edited comment on KAFKA-15144 at 7/3/23 4:52 PM: --- producing 1 record at a time with th console producer, while a consumer is polling (on source) the MM2 logs report : {{[2023-07-03 17:44:49,479] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,0): Translated 0 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:44:49,479] TRACE [MirrorCheckpointConnector|task-0] *Emitting* Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=0, downstreamOffset=0{*}, metadata=} (first for this partition) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:204)}} {{...}} {{[2023-07-03 17:44:54,510] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,1): Translated 1 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:44:54,511] TRACE [MirrorCheckpointConnector|task-0] *Emitting* Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=1, downstreamOffset=1{*}, metadata=} (downstream offset advanced) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:214)}} {{...}} {{[2023-07-03 17:45:04,547] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,2): Translated 1 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:45:04,548] TRACE [MirrorCheckpointConnector|task-0] *Skipping* Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=2, downstreamOffset=1{*}, metadata=} (repeated checkpoint) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:220)}} was (Author: ecomar): producing 1 record at a time with th console producer, while a consumer is polling (on source) the MM2 logs report : {{[2023-07-03 17:44:49,479] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,0): Translated 0 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:44:49,479] TRACE [MirrorCheckpointConnector|task-0] Emitting Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0, metadata=} (first for this partition) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:204)}} {{...}} {{[2023-07-03 17:44:54,510] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,1): Translated 1 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:44:54,511] TRACE [MirrorCheckpointConnector|task-0] Emitting Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=1, downstreamOffset=1, metadata=} (downstream offset advanced) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:214)}} {{...}} {{[2023-07-03 17:45:04,547] DEBUG [MirrorCheckpointConnector|task-0] translateDownstream(edogroup,vf-mirroring-test-edo-0,2): Translated 1 (relative to OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}) (org.apache.kafka.connect.mirror.OffsetSyncStore:161)}} {{[2023-07-03 17:45:04,548] TRACE [MirrorCheckpointConnector|task-0] *Skipping* Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=2, downstreamOffset=1{*}, metadata=} (repeated checkpoint) (org.apache.kafka.connect.mirror.MirrorCheckpointTask:220)}} > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the
[jira] [Updated] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-15144: -- Description: Steps to reproduce : 1.Start the source cluster 2.Start the target cluster 3.Start connect-mirror-maker.sh using a config like the attached 4.Create a topic in source cluster 5.produce a few messages 6.consume them all with autocommit enabled 7. then dump the Checkpoint topic content e.g. {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic source.checkpoints.internal --from-beginning --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} {{{}Checkpoint{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, {*}downstreamOffset=1{*}, metadata={ the downstreamOffset remains at 1, while, in a fresh cluster pair like with the source topic created while MM2 is running, I'd expect the downstreamOffset to match the upstreamOffset. Note that dumping the offset sync topic, shows matching initial offsets {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic mm2-offset-syncs.source.internal --from-beginning --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0{ was: Steps to reproduce : 1.Start the source cluster 2.Start the target cluster 3.Start connect-mirror-maker.sh using a config like the attached 4.Create a topic in source cluster 5.produce a few messages 6.consume them all with autocommit enabled 7. then dump the Checkpoint topic content e.g. {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic source.checkpoints.internal --from-beginning --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} {{Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, downstreamOffset=1, metadata=}}} the downstreamOffset remains at 1, while, in a fresh cluster pair like with the source topic created while MM2 is running, I'd expect the downstreamOffset to match the upstreamOffset. Note that dumping the offset sync topic, shows matching initial offsets {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic mm2-offset-syncs.source.internal --from-beginning --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} {{OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}}} > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15144) Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-15144: -- Description: Steps to reproduce : 1.Start the source cluster 2.Start the target cluster 3.Start connect-mirror-maker.sh using a config like the attached 4.Create a topic in source cluster 5.produce a few messages 6.consume them all with autocommit enabled 7. then dump the Checkpoint topic content e.g. {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic source.checkpoints.internal --from-beginning --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} {{Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, downstreamOffset=1, metadata=}}} the downstreamOffset remains at 1, while, in a fresh cluster pair like with the source topic created while MM2 is running, I'd expect the downstreamOffset to match the upstreamOffset. Note that dumping the offset sync topic, shows matching initial offsets {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic mm2-offset-syncs.source.internal --from-beginning --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} {{OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0}}} was: Steps to reproduce : Start source cluster Start target cluster start connect-mirror-maker.sh using a config like the attached create topic in source cluster produce a few messages consume them all with autocmiit enabled then dumping the Checkpoint topic content e.g. % bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic source.checkpoints.internal --from-beginning --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, downstreamOffset=1, metadata=} the downstreamOffset remains at 1, while, in a fresh cluster pair like with the source topic created while MM2 is running, I'd expect the downstreamOffset to match the upstreamOffset. dumping the offset sync topic % bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic mm2-offset-syncs.source.internal --from-beginning --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter shows matching initial offsets OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} > Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{Checkpoint\{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > downstreamOffset=1, metadata=}}} > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0}}} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-15144: -- Summary: MM2 Checkpoint downstreamOffset stuck to 1 (was: Checkpoint downstreamOffset stuck to 1) > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{Checkpoint\{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > downstreamOffset=1, metadata=}}} > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0}}} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15144) Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-15144: - Assignee: Edoardo Comar > Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{Checkpoint\{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > downstreamOffset=1, metadata=}}} > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0}}} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15144) Checkpoint downstreamOffset stuck to 1
Edoardo Comar created KAFKA-15144: - Summary: Checkpoint downstreamOffset stuck to 1 Key: KAFKA-15144 URL: https://issues.apache.org/jira/browse/KAFKA-15144 Project: Kafka Issue Type: Bug Components: mirrormaker Reporter: Edoardo Comar Attachments: edo-connect-mirror-maker-sourcetarget.properties Steps to reproduce : Start source cluster Start target cluster start connect-mirror-maker.sh using a config like the attached create topic in source cluster produce a few messages consume them all with autocmiit enabled then dumping the Checkpoint topic content e.g. % bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic source.checkpoints.internal --from-beginning --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter Checkpoint\{consumerGroupId=edogroup, topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, downstreamOffset=1, metadata=} the downstreamOffset remains at 1, while, in a fresh cluster pair like with the source topic created while MM2 is running, I'd expect the downstreamOffset to match the upstreamOffset. dumping the offset sync topic % bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic mm2-offset-syncs.source.internal --from-beginning --formatter org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter shows matching initial offsets OffsetSync\{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, downstreamOffset=0} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15133) RequestMetrics MessageConversionsTimeMs count is ticked even when no conversion occurs
[ https://issues.apache.org/jira/browse/KAFKA-15133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738205#comment-17738205 ] Edoardo Comar commented on KAFKA-15133: --- [~rsivaram] - reading KIP 188 I am not sure if the intent of the MessageConversionsTimeMs Histogram was also to record the number of time there was no conversion, so that if only say 1% of messages required conversions, we'd see that in the percentile distribution I found that I was comparing the count with the one in BrokerTopicMetrics and the mismatch was obvious > RequestMetrics MessageConversionsTimeMs count is ticked even when no > conversion occurs > -- > > Key: KAFKA-15133 > URL: https://issues.apache.org/jira/browse/KAFKA-15133 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.5.0, 3.4.1 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Minor > > The Histogram > {}{color:#00}RequestChannel{color}.{}}}messageConversionsTimeHist}} > is ticked even when a Produce/Fetch request incurred no conversion, > because a new entry is added to the historgram distribution, with a 0ms value. > > It's confusing comparing the Histogram > kafka.network RequestMetrics MessageConversionsTimeMs > with the Meter > kafka.server BrokerTopicMetrics ProduceMessageConversionsPerSec > because for the latter, the metric is ticked only if a conversion actually > occurred -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15133) RequestMetrics MessageConversionsTimeMs count is ticked even when no conversion occurs
Edoardo Comar created KAFKA-15133: - Summary: RequestMetrics MessageConversionsTimeMs count is ticked even when no conversion occurs Key: KAFKA-15133 URL: https://issues.apache.org/jira/browse/KAFKA-15133 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.4.1, 3.5.0 Reporter: Edoardo Comar Assignee: Edoardo Comar The Histogram {}{color:#00}RequestChannel{color}.{}}}messageConversionsTimeHist}} is ticked even when a Produce/Fetch request incurred no conversion, because a new entry is added to the historgram distribution, with a 0ms value. It's confusing comparing the Histogram kafka.network RequestMetrics MessageConversionsTimeMs with the Meter kafka.server BrokerTopicMetrics ProduceMessageConversionsPerSec because for the latter, the metric is ticked only if a conversion actually occurred -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14996) The KRaft controller should properly handle overly large user operations
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726688#comment-17726688 ] Edoardo Comar commented on KAFKA-14996: --- Opened a PR to allow responding gracefully with INVALID_PARTITION error to the clients https://github.com/apache/kafka/pull/13766 > The KRaft controller should properly handle overly large user operations > > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.5.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Blocker > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725858#comment-17725858 ] Edoardo Comar commented on KAFKA-14996: --- I found another way to get into the error state. 3 broker/controller cluster, all 3 voters. If I shut down the 2 non-active quorum members, the remaining acive controller enters the state where it logs `[2023-05-24 16:29:45,129] WARN [BrokerToControllerChannelManager id=1 name=heartbeat] Received error UNKNOWN_SERVER_ERROR from node 1 when making an ApiVersionsRequest with correlation id 3945. Disconnecting. (org.apache.kafka.clients.NetworkClient)` and correspondingly ``` [2023-05-24 16:29:45,128] WARN [QuorumController id=1] getFinalizedFeatures: failed with unknown server exception RuntimeException in 222 us. The controller is already in standby mode. (org.apache.kafka.controller.QuorumController) java.lang.RuntimeException: No in-memory snapshot for epoch 159730. Snapshot epochs are: at org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:173) at org.apache.kafka.timeline.SnapshotRegistry.iterator(SnapshotRegistry.java:131) at org.apache.kafka.timeline.TimelineObject.get(TimelineObject.java:69) at org.apache.kafka.controller.FeatureControlManager.finalizedFeatures(FeatureControlManager.java:303) at org.apache.kafka.controller.QuorumController.lambda$finalizedFeatures$16(QuorumController.java:2016) at org.apache.kafka.controller.QuorumController$ControllerReadEvent.run(QuorumController.java:546) ``` in controller.log > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Critical > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724946#comment-17724946 ] Edoardo Comar edited comment on KAFKA-14996 at 5/22/23 2:15 PM: The controller instability is not reproducible with 3.4 (at the git commit `2f13471181` so it must be a regression) Also 3.5 `10189d6159` does not exhibit the controller bug was (Author: ecomar): The controller instability is not reproducible with 3.4 (at the git commit `2f13471181` so it must be a regression) > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Critical > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724948#comment-17724948 ] Edoardo Comar commented on KAFKA-14996: --- is the state the controller gets in similar to https://issues.apache.org/jira/browse/KAFKA-14644 ? > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Critical > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724946#comment-17724946 ] Edoardo Comar edited comment on KAFKA-14996 at 5/22/23 1:10 PM: The controller instability is not reproducible with 3.4 (at the git commit `2f13471181` so it must be a regression) was (Author: ecomar): The controller instability is not reproducible with 3.4 (at the git commit `721a917b44` so it must be a regression) > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Critical > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724946#comment-17724946 ] Edoardo Comar commented on KAFKA-14996: --- The controller instability is not reproducible with 3.4 (at the git commit `721a917b44` so it must be a regression) > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Critical > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724307#comment-17724307 ] Edoardo Comar edited comment on KAFKA-14996 at 5/19/23 2:55 PM: given that this means a client request can cause a cluster to become unavailable, I'd raise the Priority to critical this is a potential denial of service attack? cc [~mimaison] [~ijuma] [~rajinisiva...@gmail.com] was (Author: ecomar): given that this means a client request can cause a cluster to become unavailable, I'd raise the Priority to critical this is a potential denial of service attack cc [~mimaison] [~ijuma] [~rajinisiva...@gmail.com] > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Critical > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724307#comment-17724307 ] Edoardo Comar edited comment on KAFKA-14996 at 5/19/23 2:55 PM: given that this means a client request can cause a cluster to become unavailable, I'd raise the Priority to critical this is a potential denial of service attack cc [~mimaison] [~ijuma] [~rajinisiva...@gmail.com] was (Author: ecomar): given that this means a client request can cause a cluster to become unavailable, I'd raise the Priority to critical > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Critical > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724308#comment-17724308 ] Edoardo Comar commented on KAFKA-14996: --- The controller.log s are full of {{[2023-05-19 15:50:18,834] WARN [QuorumController id=0] getFinalizedFeatures: failed with unknown server exception RuntimeException in 28 us. The controller is already in standby mode. (org.apache.kafka.controller.QuorumController)}} {{java.lang.RuntimeException: No in-memory snapshot for epoch 84310. Snapshot epochs are: 61900}} {{ at org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:173)}} {{ at org.apache.kafka.timeline.SnapshotRegistry.iterator(SnapshotRegistry.java:131)}} {{ at org.apache.kafka.timeline.TimelineObject.get(TimelineObject.java:69)}} {{ at org.apache.kafka.controller.FeatureControlManager.finalizedFeatures(FeatureControlManager.java:303)}} {{ at org.apache.kafka.controller.QuorumController.lambda$finalizedFeatures$16(QuorumController.java:2016)}} {{ at org.apache.kafka.controller.QuorumController$ControllerReadEvent.run(QuorumController.java:546)}} {{ at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} {{ at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} {{ at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Critical > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724307#comment-17724307 ] Edoardo Comar commented on KAFKA-14996: --- given that this means a client request can cause a cluster to become unavailable, I'd raise the Priority to critical > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Critical > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-14996: -- Priority: Critical (was: Major) > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Critical > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)}} > {{ at java.base/java.lang.Thread.run(Thread.java:829)}} > {{[}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724300#comment-17724300 ] Edoardo Comar edited comment on KAFKA-14996 at 5/19/23 2:48 PM: Similar error is encounter if creating partitions > QuorumController.MAX_RECORDS_PER_BATCH on an existing topic. More worrying is that the cluster looks like it can be unstable after the error occurs. Seen in a cluster with 6 nodes 0,1,2=broker,controller 3,4,5=broker e.g. server.log for node 1 : {{[2023-05-19 15:43:32,640] INFO [RaftManager id=1] Completed transition to CandidateState(localId=1, epoch=300, retries=86, voteStates=\{0=UNRECORDED, 1=GRANTED, 2=UNRECORDED}, highWatermark=Optional.empty, electionTimeoutMs=1145) from CandidateState(localId=1, epoch=299, retries=85, voteStates=\{0=UNRECORDED, 1=GRANTED, 2=UNRECORDED}, highWatermark=Optional.empty, electionTimeoutMs=1817) (org.apache.kafka.raft.QuorumState)}} {{[2023-05-19 15:43:32,649] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 0 when making an ApiVersionsRequest with correlation id 4646. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:32,650] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 2 when making an ApiVersionsRequest with correlation id 4647. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:33,095] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 0 when making an ApiVersionsRequest with correlation id 4652. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:33,147] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 2 when making an ApiVersionsRequest with correlation id 4656. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:33,594] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 0 when making an ApiVersionsRequest with correlation id 4678. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:33,696] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 2 when making an ApiVersionsRequest with correlation id 4684. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:33,773] INFO [RaftManager id=1] Election has timed out, backing off for 1000ms before becoming a candidate again (org.apache.kafka.raft.KafkaRaftClient)}} {{[2023-05-19 15:43:34,774] INFO [RaftManager id=1] Re-elect as candidate after election backoff has completed (org.apache.kafka.raft.KafkaRaftClient)}} {{[2023-05-19 15:43:34,784] INFO [RaftManager id=1] Completed transition to CandidateState(localId=1, epoch=301, retries=87, voteStates=\{0=UNRECORDED, 1=GRANTED, 2=UNRECORDED}, highWatermark=Optional.empty, electionTimeoutMs=1022) from CandidateState(localId=1, epoch=300, retries=86, voteStates=\{0=UNRECORDED, 1=GRANTED, 2=UNRECORDED}, highWatermark=Optional.empty, electionTimeoutMs=1145) (org.apache.kafka.raft.QuorumState)}} {{[2023-05-19 15:43:34,802] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 0 when making an ApiVersionsRequest with correlation id 4691. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:34,825] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 2 when making an ApiVersionsRequest with correlation id 4692. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} In this state, client requests that should mutate the metadata (eg delete a topic) always timeout {{% bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic edotest1}} {{Error while executing topic command : Call(callName=deleteTopics, deadlineMs=1684507597582, tries=1, nextAllowedTryMs=1684507597698) timed out at 1684507597598 after 1 attempt(s)}} {{[2023-05-19 15:46:37,602] ERROR org.apache.kafka.common.errors.TimeoutException: Call(callName=deleteTopics, deadlineMs=1684507597582, tries=1, nextAllowedTryMs=1684507597698) timed out at 1684507597598 after 1 attempt(s)}} {{Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled deleteTopics request with correlation id 5 due to node 5 being disconnected}} {{ (kafka.admin.TopicCommand$)}} was (Author: ecomar): Similar error is encounter if creating partitions > QuorumController.MAX_RECORDS_PER_BATCH on an existing topic. More worrying is that the cluster looks like it can be unstable after the error occurs. Seen in a cluster with 6 nodes 0,1,2=broker,controller 3,4,5=broker e.g. server.log for node 1 : {{[2023-05-19 15:43:32,640] INFO [RaftManager id=1] Completed transition to CandidateState(localId=1, epoch=300, retries=86, voteStates=\{0=UNRECORDED, 1=GRANTED, 2=UNRECORDED}, highWatermark=Optional.empty, electionTimeoutMs=1145) from CandidateState(localId=1, epoch=299, retries=85,
[jira] [Commented] (KAFKA-14996) CreateTopic falis with UnknownServerException if num partitions >= QuorumController.MAX_RECORDS_PER_BATCH
[ https://issues.apache.org/jira/browse/KAFKA-14996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724300#comment-17724300 ] Edoardo Comar commented on KAFKA-14996: --- Similar error is encounter if creating partitions > QuorumController.MAX_RECORDS_PER_BATCH on an existing topic. More worrying is that the cluster looks like it can be unstable after the error occurs. Seen in a cluster with 6 nodes 0,1,2=broker,controller 3,4,5=broker e.g. server.log for node 1 : {{[2023-05-19 15:43:32,640] INFO [RaftManager id=1] Completed transition to CandidateState(localId=1, epoch=300, retries=86, voteStates=\{0=UNRECORDED, 1=GRANTED, 2=UNRECORDED}, highWatermark=Optional.empty, electionTimeoutMs=1145) from CandidateState(localId=1, epoch=299, retries=85, voteStates=\{0=UNRECORDED, 1=GRANTED, 2=UNRECORDED}, highWatermark=Optional.empty, electionTimeoutMs=1817) (org.apache.kafka.raft.QuorumState)}} {{[2023-05-19 15:43:32,649] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 0 when making an ApiVersionsRequest with correlation id 4646. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:32,650] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 2 when making an ApiVersionsRequest with correlation id 4647. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:33,095] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 0 when making an ApiVersionsRequest with correlation id 4652. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:33,147] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 2 when making an ApiVersionsRequest with correlation id 4656. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:33,594] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 0 when making an ApiVersionsRequest with correlation id 4678. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:33,696] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 2 when making an ApiVersionsRequest with correlation id 4684. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:33,773] INFO [RaftManager id=1] Election has timed out, backing off for 1000ms before becoming a candidate again (org.apache.kafka.raft.KafkaRaftClient)}} {{[2023-05-19 15:43:34,774] INFO [RaftManager id=1] Re-elect as candidate after election backoff has completed (org.apache.kafka.raft.KafkaRaftClient)}} {{[2023-05-19 15:43:34,784] INFO [RaftManager id=1] Completed transition to CandidateState(localId=1, epoch=301, retries=87, voteStates=\{0=UNRECORDED, 1=GRANTED, 2=UNRECORDED}, highWatermark=Optional.empty, electionTimeoutMs=1022) from CandidateState(localId=1, epoch=300, retries=86, voteStates=\{0=UNRECORDED, 1=GRANTED, 2=UNRECORDED}, highWatermark=Optional.empty, electionTimeoutMs=1145) (org.apache.kafka.raft.QuorumState)}} {{[2023-05-19 15:43:34,802] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 0 when making an ApiVersionsRequest with correlation id 4691. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{[2023-05-19 15:43:34,825] WARN [RaftManager id=1] Received error UNKNOWN_SERVER_ERROR from node 2 when making an ApiVersionsRequest with correlation id 4692. Disconnecting. (org.apache.kafka.clients.NetworkClient)}} {{}} > CreateTopic falis with UnknownServerException if num partitions >= > QuorumController.MAX_RECORDS_PER_BATCH > -- > > Key: KAFKA-14996 > URL: https://issues.apache.org/jira/browse/KAFKA-14996 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > > If an attempt is made to create a topic with > num partitions >= QuorumController.MAX_RECORDS_PER_BATCH (1) > the client receives an UnknownServerException - it could rather receive a > better error. > The controller logs > {{2023-05-12 19:25:10,018] WARN [QuorumController id=1] createTopics: failed > with unknown server exception IllegalStateException at epoch 2 in 21956 us. > Renouncing leadership and reverting to the last committed offset 174. > (org.apache.kafka.controller.QuorumController)}} > {{java.lang.IllegalStateException: Attempted to atomically commit 10001 > records, but maxRecordsPerBatch is 1}} > {{ at > org.apache.kafka.controller.QuorumController.appendRecords(QuorumController.java:812)}} > {{ at > org.apache.kafka.controller.QuorumController$ControllerWriteEvent.run(QuorumController.java:719)}} > {{ at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)}} >
[jira] [Commented] (KAFKA-13279) Implement CreateTopicsPolicy for KRaft
[ https://issues.apache.org/jira/browse/KAFKA-13279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724293#comment-17724293 ] Edoardo Comar commented on KAFKA-13279: --- should this one not marked as closed / resolved ? > Implement CreateTopicsPolicy for KRaft > -- > > Key: KAFKA-13279 > URL: https://issues.apache.org/jira/browse/KAFKA-13279 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > > Implement CreateTopicsPolicy for KRaft -- This message was sent by Atlassian Jira (v8.20.10#820010)