[jira] [Comment Edited] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163248#comment-17163248 ] Akshay Sharma edited comment on KAFKA-10284 at 7/23/20, 6:24 AM: - Hi [~bchen225242]/[~guozhang], I've raised the bug related to the same issue 6 days ago. Please look at it once. https://issues.apache.org/jira/browse/KAFKA-10285 Analysis, `when i've not restarted the broker and restarted consumer, I could see below logs` ``` [2020-07-16 13:56:17,189] INFO [GroupCoordinator 1001]: Preparing to rebalance group 0 in state PreparingRebalance with old generation 0 (__consumer_offsets-48) (reason: Adding new member 1-159490144 with group instanceid Some(1)) (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:56:17,236] INFO [GroupCoordinator 1001]: Stabilized group 0 generation 1 (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:56:17,282] INFO [GroupCoordinator 1001]: Assignment received from leader for group 0 for generation 1 (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:59:33,613] INFO [GroupCoordinator 1001]: Static member Some(1) with unknown member id rejoins, assigning new member id 1-159490335, while old member 1-159490144 will be removed. (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:59:33,635] INFO [GroupCoordinator 1001]: Static member joins during Stable stage will not trigger rebalance. (kafka.coordinator.group.GroupCoordinator) ``` when restarted the broker, I could see broker is expecting some other member.id(old member.id of consumer) ``` 2020-07-16 14:04:04,953] ERROR given member.id 1-159490335 is identified as a known static member 1,but not matching the expected member.id 1-159490144 (kafka.coordinator.group.GroupMetadata) ``` was (Author: akshaysh): Hi [~bchen225242]/[~guozhang], I've raised the bug related to the same issue. Please look at it once. https://issues.apache.org/jira/browse/KAFKA-10285 Analysis, `when i've not restarted the broker and restarted consumer, I could see below logs` ``` [2020-07-16 13:56:17,189] INFO [GroupCoordinator 1001]: Preparing to rebalance group 0 in state PreparingRebalance with old generation 0 (__consumer_offsets-48) (reason: Adding new member 1-159490144 with group instanceid Some(1)) (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:56:17,236] INFO [GroupCoordinator 1001]: Stabilized group 0 generation 1 (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:56:17,282] INFO [GroupCoordinator 1001]: Assignment received from leader for group 0 for generation 1 (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:59:33,613] INFO [GroupCoordinator 1001]: Static member Some(1) with unknown member id rejoins, assigning new member id 1-159490335, while old member 1-159490144 will be removed. (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:59:33,635] INFO [GroupCoordinator 1001]: Static member joins during Stable stage will not trigger rebalance. (kafka.coordinator.group.GroupCoordinator) ``` when restarted the broker, I could see broker is expecting some other member.id(old member.id of consumer) ``` 2020-07-16 14:04:04,953] ERROR given member.id 1-159490335 is identified as a known static member 1,but not matching the expected member.id 1-159490144 (kafka.coordinator.group.GroupMetadata) ``` > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163248#comment-17163248 ] Akshay Sharma commented on KAFKA-10284: --- Hi [~bchen225242]/[~guozhang], I've raised the bug related to the same issue. Please look at it once. https://issues.apache.org/jira/browse/KAFKA-10285 Analysis, `when i've not restarted the broker and restarted consumer, I could see below logs` ``` [2020-07-16 13:56:17,189] INFO [GroupCoordinator 1001]: Preparing to rebalance group 0 in state PreparingRebalance with old generation 0 (__consumer_offsets-48) (reason: Adding new member 1-159490144 with group instanceid Some(1)) (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:56:17,236] INFO [GroupCoordinator 1001]: Stabilized group 0 generation 1 (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:56:17,282] INFO [GroupCoordinator 1001]: Assignment received from leader for group 0 for generation 1 (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:59:33,613] INFO [GroupCoordinator 1001]: Static member Some(1) with unknown member id rejoins, assigning new member id 1-159490335, while old member 1-159490144 will be removed. (kafka.coordinator.group.GroupCoordinator) [2020-07-16 13:59:33,635] INFO [GroupCoordinator 1001]: Static member joins during Stable stage will not trigger rebalance. (kafka.coordinator.group.GroupCoordinator) ``` when restarted the broker, I could see broker is expecting some other member.id(old member.id of consumer) ``` 2020-07-16 14:04:04,953] ERROR given member.id 1-159490335 is identified as a known static member 1,but not matching the expected member.id 1-159490144 (kafka.coordinator.group.GroupMetadata) ``` > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] huxihx commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
huxihx commented on pull request #9051: URL: https://github.com/apache/kafka/pull/9051#issuecomment-662835750 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10205) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163394#comment-17163394 ] Igor Soarez commented on KAFKA-10205: - I'm happy take this one on. > NullPointerException in StreamTask > -- > > Key: KAFKA-10205 > URL: https://issues.apache.org/jira/browse/KAFKA-10205 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Brian Forkan >Priority: Minor > Labels: beginner, newbie > > In our Kafka Streams application we have been experiencing a > NullPointerException when deploying a new version of our application. This > does not happen during a normal rolling restart. > The exception is: > {code:java} > Error caught during partition assignment, will abort the current process and > re-throw at the end of > rebalance","stack_trace":"java.lang.NullPointerException: nullError caught > during partition assignment, will abort the current process and re-throw at > the end of rebalance","stack_trace":"java.lang.NullPointerException: null at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) > at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at > brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {code} > And the relevant lines of code - > [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196] > I suspect "topology.source(partition.topic());" is returning null. > Has anyone experienced this issue before? I suspect there is a problem with > our topology but I can't replicate this on my machine so I can't tell. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10205) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-10205: --- Assignee: Igor Soarez > NullPointerException in StreamTask > -- > > Key: KAFKA-10205 > URL: https://issues.apache.org/jira/browse/KAFKA-10205 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Brian Forkan >Assignee: Igor Soarez >Priority: Minor > Labels: beginner, newbie > > In our Kafka Streams application we have been experiencing a > NullPointerException when deploying a new version of our application. This > does not happen during a normal rolling restart. > The exception is: > {code:java} > Error caught during partition assignment, will abort the current process and > re-throw at the end of > rebalance","stack_trace":"java.lang.NullPointerException: nullError caught > during partition assignment, will abort the current process and re-throw at > the end of rebalance","stack_trace":"java.lang.NullPointerException: null at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) > at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at > brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {code} > And the relevant lines of code - > [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196] > I suspect "topology.source(partition.topic());" is returning null. > Has anyone experienced this issue before? I suspect there is a problem with > our topology but I can't replicate this on my machine so I can't tell. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bob-barrett commented on pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log
bob-barrett commented on pull request #9054: URL: https://github.com/apache/kafka/pull/9054#issuecomment-662841978 While addressing the feedback, I realized I had missed the case when replacing a current log with a future log as part of altering a partition's log dir. We also deferred metrics removal until the file deletion in that case. That has been fixed now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
stanislavkozlovski commented on pull request #9050: URL: https://github.com/apache/kafka/pull/9050#issuecomment-662860332 > add preempt(): Unit method for all ControllerEvent so that all events (and future events) must implement it > for events that have callbacks, move the preemption from individual methods to preempt() > add preemption for ApiPartitionReassignmentand ListPartitionReassignments Great idea This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining
showuon commented on pull request #9062: URL: https://github.com/apache/kafka/pull/9062#issuecomment-662919173 @feyman2016 @huxihx , since you have experience in this test before, could you review this small PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
dajac commented on a change in pull request #9051: URL: https://github.com/apache/kafka/pull/9051#discussion_r459252113 ## File path: core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala ## @@ -775,6 +775,24 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet verifyMarkPartitionsForTruncation() } + @Test + def testDefaultValueRestoredAfterDeleteDynamicConfig(): Unit = { +val newProps = new Properties +newProps.put(KafkaConfig.LogRetentionTimeMillisProp, "10") +newProps.put(KafkaConfig.LogFlushIntervalMsProp, "1") +TestUtils.incrementalAlterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get Review comment: That makes sense, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
stanislavkozlovski commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r459262136 ## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ## @@ -598,6 +603,86 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { }, "Broker fail to initialize after restart") } + @Test + def testPreemptionOnControllerShutdown(): Unit = { +servers = makeServers(1, enableControlledShutdown = false) +val controller = getController().kafkaController +val count = new AtomicInteger(2) +val latch = new CountDownLatch(1) +val spyThread = spy(controller.eventManager.thread) +controller.eventManager.setControllerEventThread(spyThread) +val processedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = latch.await() + override def preempt(): Unit = {} +} +val preemptedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = {} + override def preempt(): Unit = count.decrementAndGet() +} + +controller.eventManager.put(processedEvent) +controller.eventManager.put(preemptedEvent) +controller.eventManager.put(preemptedEvent) + +doAnswer((_: InvocationOnMock) => { + latch.countDown() +}).doCallRealMethod().when(spyThread).awaitShutdown() Review comment: Could we have the test call `latch.countDown()` in a background thread with a delay, right before we call controller.shutdown? I guess it's possible to have race conditions with that solution This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining
showuon opened a new pull request #9062: URL: https://github.com/apache/kafka/pull/9062 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients
[ https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163344#comment-17163344 ] Paul Webb edited comment on KAFKA-8154 at 7/23/20, 8:52 AM: Hi. Please note that we were recently affected by this issue. I tried several kafka clients/ brokers and JDKs. The actual cause in the end was that the java security providers was being changed. Specifically: {code:java} Security.insertProviderAt(providerToAdd, 1); {code} By adding at position 1, it appeared that the new provider was returning an invalid application buffer size. This may explain why this could be difficult to reproduce. This is some logging to show the issue: {noformat} 2020-07-23 08:13:55.963Z INFO - Adding provider: OpenSSLProvider: Conscrypt info: Android's OpenSSL-backed security provider 2020-07-23 08:13:55.963Z INFO - SECURITY PROVIDERS - 2020-07-23 08:13:55.965Z INFO - -> Sun: SUN info: SUN (DSA key/parameter generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, Collection CertStores, JavaPolicy Po licy; JavaLoginConfig Configuration) 2020-07-23 08:13:55.965Z INFO - -> SunRsaSign: SunRsaSign info: Sun RSA signature provider 2020-07-23 08:13:55.965Z INFO - -> SunEC: SunEC info: Sun Elliptic Curve provider (EC, ECDSA, ECDH) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunJSSE info: Sun JSSE provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2) 2020-07-23 08:13:55.966Z INFO - -> SunJCE: SunJCE info: SunJCE Provider (implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, Diffie-Hellman, HMAC) 2020-07-23 08:13:55.966Z INFO - -> SunProvider: SunJGSS info: Sun (Kerberos v5, SPNEGO) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunSASL info: Sun SASL provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM) 2020-07-23 08:13:55.966Z INFO - -> XMLDSigRI: XMLDSig info: XMLDSig (DOM XMLSignatureFactory; DOM KeyInfoFactory; C14N 1.0, C14N 1.1, Exclusive C14N, Base64, Enveloped, XPath, XPath2, XSLT TransformServices) 2020-07-23 08:13:55.966Z INFO - -> SunPCSC: SunPCSC info: Sun PC/SC provider 2020-07-23 08:13:55.966Z INFO - -> BouncyCastleProvider: BC info: BouncyCastle Security Provider v1.61 2020-07-23 08:13:55.966Z INFO - SECURITY PROVIDERS -{noformat} So my hypothesis is that the new provider ( in this case from Conscrypt) is inserted at the head of the list. When the SSLSession is called (getApplicationBufferSize) it returns MAX_PLAINTEXT_LENGTH which is 2^14 (16384) as per [https://tools.ietf.org/html/rfc5246#section-6.2.1] When the new provider is added to the back of the list Kafka behaved fine and this issued disappeared completely. Hope this helps. was (Author: pwebb.itrs): Hi. Please note that we were recently affected by this issue. I tried several kafka clients/ brokers and JDKs. The actual cause in the end was that the java security providers was being changed. Specifically: {code:java} Security.insertProviderAt(providerToAdd, 1); {code} By adding at position 1, it appeared that the new provider was returning an invalid application buffer size. This may explain why this could be difficult to reproduce. When the new provider is added to the back of the list Kafka behaved fine. Hope this helps. > Buffer Overflow exceptions between brokers and with clients > --- > > Key: KAFKA-8154 > URL: https://issues.apache.org/jira/browse/KAFKA-8154 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Rajesh Nataraja >Priority: Major > Attachments: server.properties.txt > > > https://github.com/apache/kafka/pull/6495 > https://github.com/apache/kafka/pull/5785 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9343) Add ps command for Kafka and zookeeper process on z/OS.
[ https://issues.apache.org/jira/browse/KAFKA-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuo Zhang reassigned KAFKA-9343: - Assignee: Shuo Zhang > Add ps command for Kafka and zookeeper process on z/OS. > --- > > Key: KAFKA-9343 > URL: https://issues.apache.org/jira/browse/KAFKA-9343 > Project: Kafka > Issue Type: Task > Components: tools >Affects Versions: 2.4.0 > Environment: z/OS, OS/390 >Reporter: Shuo Zhang >Assignee: Shuo Zhang >Priority: Major > Labels: OS/390, z/OS > Fix For: 2.5.0, 2.4.2 > > Original Estimate: 168h > Remaining Estimate: 168h > > +Note: since the final change scope changed, I changed the summary and > description.+ > The existing method to check Kafka process for other platform doesn't > applicable for z/OS, on z/OS, the best keyword we can use is the JOBNAME. > PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk > '\{print $1}') > --> > PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v > grep | awk '\{print $1}') > So does the zookeeper process. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients
[ https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163344#comment-17163344 ] Paul Webb commented on KAFKA-8154: -- Hi. Please note that we were recently affected by this issue. I tried several kafka clients/ brokers and JDKs. The actual cause in the end was that the java security providers was being changed. Specifically: {code:java} Security.insertProviderAt(providerToAdd, 1); {code} By adding at position 1, it appeared that the new provider was returning an invalid application buffer size. This may explain why this could be difficult to reproduce. When the new provider is added to the back of the list Kafka behaved fine. Hope this helps. > Buffer Overflow exceptions between brokers and with clients > --- > > Key: KAFKA-8154 > URL: https://issues.apache.org/jira/browse/KAFKA-8154 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Rajesh Nataraja >Priority: Major > Attachments: server.properties.txt > > > https://github.com/apache/kafka/pull/6495 > https://github.com/apache/kafka/pull/5785 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] leonardge opened a new pull request #9063: Fixed deprecated Gradle build Properties.
leonardge opened a new pull request #9063: URL: https://github.com/apache/kafka/pull/9063 Gradle properties: `baseName`, `classifier` and `version` has been deprecated. So I have change these to `archiveBaseName`, `archiveClassifier` and `archiveVersion`. More infomration [here](https://docs.gradle.org/6.5/dsl/org.gradle.api.tasks.bundling.Zip.html#org.gradle.api.tasks.bundling.Zip:zip64). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8098) Flaky Test AdminClientIntegrationTest#testConsumerGroups
[ https://issues.apache.org/jira/browse/KAFKA-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163397#comment-17163397 ] Luke Chen commented on KAFKA-8098: -- In the test, we first test removing 1 member from group, and then test removing the other 2 members from group, and it failed sometimes at the 2nd member number assert. After investigation, I found it's because we enabled auto commit for the consumers(default setting), and the removed consumer offset commit will get the {{UNKNOWN_MEMBER_ID}} error, which will then make the member rejoin. (check ConsumerCoordinator#OffsetCommitResponseHandler) So, that's why after the 2nd members removing, the members will sometimes be not empty. I set the consumer config to disable the auto commit to fix this issue. Thanks. > Flaky Test AdminClientIntegrationTest#testConsumerGroups > > > Key: KAFKA-8098 > URL: https://issues.apache.org/jira/browse/KAFKA-8098 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0, 2.2.1 > > > [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3459/tests] > {quote}java.lang.AssertionError: expected:<2> but was:<0> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at org.junit.Assert.assertEquals(Assert.java:633) > at > kafka.api.AdminClientIntegrationTest.testConsumerGroups(AdminClientIntegrationTest.scala:1194){quote} > STDOUT > {quote}2019-03-12 10:52:33,482] ERROR [ReplicaFetcher replicaId=2, > leaderId=1, fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:52:33,485] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:52:35,880] WARN Unable to read additional data from client > sessionid 0x104458575770003, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) > [2019-03-12 10:52:38,596] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition elect-preferred-leaders-topic-1-0 at offset > 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:52:38,797] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition elect-preferred-leaders-topic-2-0 at offset > 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:52:51,998] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:52:52,005] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:53:13,750] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition mytopic2-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:53:13,754] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition mytopic2-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:53:13,755] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition mytopic2-1 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:53:13,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition mytopic2-1 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This
[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data
[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163485#comment-17163485 ] Bruno Cadonna edited comment on KAFKA-8037 at 7/23/20, 12:04 PM: - I find the idea of putting records in its original byte representation into state stores on source tables in combination with the deserialization of records during restoration promising. With this we would have in the state store the same data as in the source topic minus the bad data. Having this two mechanism in place would allow us to switch on the optimization by default without any further restrictions on serdes since only the deserializer is used and never the serializer. was (Author: cadonna): I find the idea of putting records in its original byte representation into state stores on source tables in combination with the deserialization of records during restoration interesting. With this we would have in the state store the same data as in the source topic minus the bad data. Having this two mechanism in place would allow us to switch on the optimization by default without any further restrictions on serdes since only the deserializer is used and never the serializer. > KTable restore may load bad data > > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data
[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163485#comment-17163485 ] Bruno Cadonna edited comment on KAFKA-8037 at 7/23/20, 12:21 PM: - I find the idea of putting records in its original byte representation into state stores on source tables in combination with the deserialization of records during restoration promising. With this we would have in the state store the same data as in the source topic minus the bad data. Having this two mechanism in place would allow us to switch on the optimization by default without any further restrictions on serdes since only one single deserializer is used and never the serializer. was (Author: cadonna): I find the idea of putting records in its original byte representation into state stores on source tables in combination with the deserialization of records during restoration promising. With this we would have in the state store the same data as in the source topic minus the bad data. Having this two mechanism in place would allow us to switch on the optimization by default without any further restrictions on serdes since only the deserializer is used and never the serializer. > KTable restore may load bad data > > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress
[ https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163529#comment-17163529 ] Igor Piddubnyi commented on KAFKA-8582: --- Hi [~mjsax], as discussed in PR please assign the ticket to me. > Consider adding an ExpiredWindowRecordHandler to Suppress > - > > Key: KAFKA-8582 > URL: https://issues.apache.org/jira/browse/KAFKA-8582 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > I got some feedback on Suppress: > {quote}Specifying how to handle events outside the grace period does seem > like a business concern, and simply discarding them thus seems risky (for > example imagine any situation where money is involved). > This sort of situation is addressed by the late-triggering approach > associated with watermarks > (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given > this I wondered if you were considering adding anything similar?{quote} > It seems like, if a record has arrived past the grace period for its window, > then the state of the windowed aggregation would already have been lost, so > if we were to compute an aggregation result, it would be incorrect. Plus, > since the window is already expired, we can't store the new (incorrect, but > more importantly expired) aggregation result either, so any subsequent > super-late records would also face the same blank-slate. I think this would > wind up looking like this: if you have three timely records for a window, and > then three more that arrive after the grace period, and you were doing a > count aggregation, you'd see the counts emitted for the window as [1, 2, 3, > 1, 1, 1]. I guess we could add a flag to the post-expiration results to > indicate that they're broken, but this seems like the wrong approach. The > post-expiration aggregation _results_ are meaningless, but I could see > wanting to send the past-expiration _input records_ to a dead-letter queue or > something instead of dropping them. > Along this line of thinking, I wonder if we should add an optional > past-expiration record handler interface to the suppression operator. Then, > you could define your own logic, whether it's a dead-letter queue, sending it > to some alerting pipeline, or even just crashing the application before it > can do something wrong. This would be a similar pattern to how we allow > custom logic to handle deserialization errors by supplying a > org.apache.kafka.streams.errors.DeserializationExceptionHandler. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME
[ https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163590#comment-17163590 ] Thiago Santos edited comment on KAFKA-9531 at 7/23/20, 1:45 PM: I am experiencing the same problem. When i start a local Kafka cluster with docker-compose. The kafka-connect producer gets stuck in this loop when i stop one of the containers in the Kafka cluster. Any update about this issue? {code:java} java.net.UnknownHostException: kafka3java.net.UnknownHostException: kafka3 at java.net.InetAddress.getAllByName0(InetAddress.java:1281) at java.net.InetAddress.getAllByName(InetAddress.java:1193) at java.net.InetAddress.getAllByName(InetAddress.java:1127) at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294) at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code} was (Author: tcsantos): I am experiencing the same problem. When i start a local Kafka cluster with docker-compose. The kafka-connect producer gets stuck in this loop when i stop one of the containers in the Kafka cluster. Any update about this issue? {code:java} kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | at java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code} > java.net.UnknownHostException loop on VM rolling update using CNAME > --- > > Key: KAFKA-9531 > URL: https://issues.apache.org/jira/browse/KAFKA-9531 > Project: Kafka > Issue Type: Bug > Components: clients, controller, network, producer >Affects Versions: 2.4.0 >Reporter: Rui Abreu >Priority: Major > > Hello, > > My cluster setup in based on VMs behind DNS CNAME . > Example: node.internal is a CNAME to either nodeA.internal or nodeB.internal > Since kafka-client 1.2.1, it has been observed that sometimes Kafka clients > get stuck on a loop with the exception: > Example after nodeB.internal is replaced with nodeA.internal > > {code:java} > 2020-02-10T12:11:28.181Z o.a.k.c.NetworkClient [WARN]- [Consumer > clientId=consumer-6, groupId=consumer.group] Error connecting to node > nodeB.internal:9092 (id: 2 rack: null) > java.net.UnknownHostException: nodeB.internal:9092 > at java.net.InetAddress.getAllByName0(InetAddress.java:1281) > ~[?:1.8.0_222] > at java.net.InetAddress.getAllByName(InetAddress.java:1193) > ~[?:1.8.0_222] > at java.net.InetAddress.getAllByName(InetAddress.java:1127)
[jira] [Comment Edited] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME
[ https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163590#comment-17163590 ] Thiago Santos edited comment on KAFKA-9531 at 7/23/20, 1:44 PM: I am experiencing the same problem. When i start a local Kafka cluster with docker-compose. The kafka-connect producer gets stuck in this loop when i stop one of the containers in the Kafka cluster. Any update about this issue? {code:java} kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | at java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code} was (Author: tcsantos): I am experiencing the same problem. When i start a local Kafka cluster with docker-compose. The kafka-connect producer gets stuck in this loop when i stop one of the containers in the Kafka cluster. Any update about this issue? {code:java} // code placeholder kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | at java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)kafka-connect | at java.lang.Thread.run(Thread.java:748) {code} > java.net.UnknownHostException loop on VM rolling update using CNAME > --- > > Key: KAFKA-9531 > URL: https://issues.apache.org/jira/browse/KAFKA-9531 > Project: Kafka > Issue Type: Bug > Components: clients, controller, network, producer >Affects Versions: 2.4.0 >Reporter: Rui Abreu >
[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
dajac commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r459258170 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); -int remainingRetries = retries; +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; Review comment: If we want to guarantee that the `deadlineMs` is respected, I think that we must set the timeout of the AdminClient's call accordingly: `CreateTopicsOptions.timeoutMs`. With the default, I think that the call could be longer than half of `MAX_POLL_INTERVAL_MS_CONFIG`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
dajac commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r459258170 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); -int remainingRetries = retries; +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; Review comment: If we want to guarantee that the `deadlineMs` is respected, I think that we must set the timeout of the AdminClient's call accordingly: 'CreateTopicsOptions.timeoutMs`. With the default, I think that the call could be longer than half of `MAX_POLL_INTERVAL_MS_CONFIG`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients
[ https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163344#comment-17163344 ] Paul Webb edited comment on KAFKA-8154 at 7/23/20, 8:53 AM: [~rsivaram] Hi. Please note that we were recently affected by this issue. I tried several kafka clients/ brokers and JDKs with no change. The actual cause in the end was that the java security providers was being changed. Specifically: {code:java} Security.insertProviderAt(providerToAdd, 1); {code} By adding at position 1, it appeared that the new provider was returning an invalid application buffer size. This may explain why this could be difficult to reproduce. This is some logging to show the issue: {noformat} 2020-07-23 08:13:55.963Z INFO - Adding provider: OpenSSLProvider: Conscrypt info: Android's OpenSSL-backed security provider 2020-07-23 08:13:55.963Z INFO - SECURITY PROVIDERS - 2020-07-23 08:13:55.965Z INFO - -> Sun: SUN info: SUN (DSA key/parameter generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, Collection CertStores, JavaPolicy Po licy; JavaLoginConfig Configuration) 2020-07-23 08:13:55.965Z INFO - -> SunRsaSign: SunRsaSign info: Sun RSA signature provider 2020-07-23 08:13:55.965Z INFO - -> SunEC: SunEC info: Sun Elliptic Curve provider (EC, ECDSA, ECDH) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunJSSE info: Sun JSSE provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2) 2020-07-23 08:13:55.966Z INFO - -> SunJCE: SunJCE info: SunJCE Provider (implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, Diffie-Hellman, HMAC) 2020-07-23 08:13:55.966Z INFO - -> SunProvider: SunJGSS info: Sun (Kerberos v5, SPNEGO) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunSASL info: Sun SASL provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM) 2020-07-23 08:13:55.966Z INFO - -> XMLDSigRI: XMLDSig info: XMLDSig (DOM XMLSignatureFactory; DOM KeyInfoFactory; C14N 1.0, C14N 1.1, Exclusive C14N, Base64, Enveloped, XPath, XPath2, XSLT TransformServices) 2020-07-23 08:13:55.966Z INFO - -> SunPCSC: SunPCSC info: Sun PC/SC provider 2020-07-23 08:13:55.966Z INFO - -> BouncyCastleProvider: BC info: BouncyCastle Security Provider v1.61 2020-07-23 08:13:55.966Z INFO - SECURITY PROVIDERS -{noformat} So my hypothesis is that the new provider ( in this case from Conscrypt) is inserted at the head of the list. When the SSLSession is called (getApplicationBufferSize) it returns MAX_PLAINTEXT_LENGTH which is 2^14 (16384) as per [https://tools.ietf.org/html/rfc5246#section-6.2.1] When the new provider is added to the back of the list Kafka behaved fine and this issued disappeared completely. Hope this helps. was (Author: pwebb.itrs): Hi. Please note that we were recently affected by this issue. I tried several kafka clients/ brokers and JDKs. The actual cause in the end was that the java security providers was being changed. Specifically: {code:java} Security.insertProviderAt(providerToAdd, 1); {code} By adding at position 1, it appeared that the new provider was returning an invalid application buffer size. This may explain why this could be difficult to reproduce. This is some logging to show the issue: {noformat} 2020-07-23 08:13:55.963Z INFO - Adding provider: OpenSSLProvider: Conscrypt info: Android's OpenSSL-backed security provider 2020-07-23 08:13:55.963Z INFO - SECURITY PROVIDERS - 2020-07-23 08:13:55.965Z INFO - -> Sun: SUN info: SUN (DSA key/parameter generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, Collection CertStores, JavaPolicy Po licy; JavaLoginConfig Configuration) 2020-07-23 08:13:55.965Z INFO - -> SunRsaSign: SunRsaSign info: Sun RSA signature provider 2020-07-23 08:13:55.965Z INFO - -> SunEC: SunEC info: Sun Elliptic Curve provider (EC, ECDSA, ECDH) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunJSSE info: Sun JSSE provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2) 2020-07-23 08:13:55.966Z INFO - -> SunJCE: SunJCE info: SunJCE Provider (implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, Diffie-Hellman, HMAC) 2020-07-23 08:13:55.966Z INFO - -> SunProvider: SunJGSS info: Sun (Kerberos v5, SPNEGO) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunSASL info: Sun SASL provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM) 2020-07-23 08:13:55.966Z INFO - -> XMLDSigRI: XMLDSig info: XMLDSig (DOM XMLSignatureFactory;
[jira] [Updated] (KAFKA-9343) Add ps command for Kafka and zookeeper process on z/OS.
[ https://issues.apache.org/jira/browse/KAFKA-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuo Zhang updated KAFKA-9343: -- Fix Version/s: 2.6.0 > Add ps command for Kafka and zookeeper process on z/OS. > --- > > Key: KAFKA-9343 > URL: https://issues.apache.org/jira/browse/KAFKA-9343 > Project: Kafka > Issue Type: Task > Components: tools >Affects Versions: 2.4.0 > Environment: z/OS, OS/390 >Reporter: Shuo Zhang >Assignee: Shuo Zhang >Priority: Major > Labels: OS/390, z/OS > Fix For: 2.5.0, 2.6.0, 2.4.2 > > Original Estimate: 168h > Remaining Estimate: 168h > > +Note: since the final change scope changed, I changed the summary and > description.+ > The existing method to check Kafka process for other platform doesn't > applicable for z/OS, on z/OS, the best keyword we can use is the JOBNAME. > PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk > '\{print $1}') > --> > PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v > grep | awk '\{print $1}') > So does the zookeeper process. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-662968116 Rebased on trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8037) KTable restore may load bad data
[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163485#comment-17163485 ] Bruno Cadonna commented on KAFKA-8037: -- I find the idea of putting records in its original byte representation into state stores on source tables in combination with the deserialization of records during restoration interesting. With this we would have in the state store the same data as in the source topic minus the bad data. Having this two mechanism in place would allow us to switch on the optimization by default without any further restrictions on serdes since only the deserializer is used and never the serializer. > KTable restore may load bad data > > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME
[ https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163590#comment-17163590 ] Thiago Santos commented on KAFKA-9531: -- I am experiencing the same problem. When i start a local Kafka cluster with docker-compose. The kafka-connect producer gets stuck in this loop when i stop one of the containers in the Kafka cluster. Any update about this issue? {code:java} // code placeholder kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | at java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)kafka-connect | at java.lang.Thread.run(Thread.java:748) {code} > java.net.UnknownHostException loop on VM rolling update using CNAME > --- > > Key: KAFKA-9531 > URL: https://issues.apache.org/jira/browse/KAFKA-9531 > Project: Kafka > Issue Type: Bug > Components: clients, controller, network, producer >Affects Versions: 2.4.0 >Reporter: Rui Abreu >Priority: Major > > Hello, > > My cluster setup in based on VMs behind DNS CNAME . > Example: node.internal is a CNAME to either nodeA.internal or nodeB.internal > Since kafka-client 1.2.1, it has been observed that sometimes Kafka clients > get stuck on a loop with the exception: > Example after nodeB.internal is replaced with nodeA.internal > > {code:java} > 2020-02-10T12:11:28.181Z o.a.k.c.NetworkClient [WARN]- [Consumer > clientId=consumer-6, groupId=consumer.group] Error connecting to node > nodeB.internal:9092 (id: 2 rack: null) > java.net.UnknownHostException: nodeB.internal:9092 > at java.net.InetAddress.getAllByName0(InetAddress.java:1281) > ~[?:1.8.0_222] > at java.net.InetAddress.getAllByName(InetAddress.java:1193) > ~[?:1.8.0_222] > at java.net.InetAddress.getAllByName(InetAddress.java:1127) > ~[?:1.8.0_222] > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:943) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:68) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1114) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1005) > ~[stormjar.jar:?] > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:537) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > ~[stormjar.jar:?] >
[jira] [Comment Edited] (KAFKA-10205) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163394#comment-17163394 ] Igor Soarez edited comment on KAFKA-10205 at 7/23/20, 2:01 PM: --- I'm happy take this one on. I've opened [https://github.com/apache/kafka/pull/9064] was (Author: soarez): I'm happy take this one on. > NullPointerException in StreamTask > -- > > Key: KAFKA-10205 > URL: https://issues.apache.org/jira/browse/KAFKA-10205 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Brian Forkan >Assignee: Igor Soarez >Priority: Minor > Labels: beginner, newbie > > In our Kafka Streams application we have been experiencing a > NullPointerException when deploying a new version of our application. This > does not happen during a normal rolling restart. > The exception is: > {code:java} > Error caught during partition assignment, will abort the current process and > re-throw at the end of > rebalance","stack_trace":"java.lang.NullPointerException: nullError caught > during partition assignment, will abort the current process and re-throw at > the end of rebalance","stack_trace":"java.lang.NullPointerException: null at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) > at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at > brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {code} > And the relevant lines of code - > [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196] > I suspect "topology.source(partition.topic());" is returning null. > Has anyone experienced this issue before? I suspect there is a problem with > our topology but I can't replicate this on my machine so I can't tell. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vitojeng opened a new pull request #9069: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries
vitojeng opened a new pull request #9069: URL: https://github.com/apache/kafka/pull/9069 follow-up #8200 KAFKA-5876's PR break into multiple parts, this PR is part 2: apply UnknownStateStoreException ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
mjsax commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r459793713 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); -int remainingRetries = retries; +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; Review comment: I am happy to add a check in `StreamsConfig` and either throw or log a WARN depending how strict we want to be. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
guozhangwang commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459799708 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately + +// hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` +throw new IllegalStateException(error); +} + stateRestoreAdapter.restoreBatch(restoreRecords); stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); restoreCount += restoreRecords.size(); Review comment: My understanding is that for non-global state stores, we would also start ticking if we cannot make progress either due to exceptions or `poll()` returned no data, is that right? If yes, I'm +1 on covering the same for global state store here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
guozhangwang commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r459800202 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -154,27 +164,34 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams "Error message was: {}", topicName, cause.toString()); throw new StreamsException(String.format("Could not create topic %s.", topicName), cause); } +} catch (final TimeoutException retryableException) { +log.error("Creating topic {} timed out.\n" + +"Error message was: {}", topicName, retryableException.toString()); } } } if (!topicsNotReady.isEmpty()) { -log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, remainingRetries); +currentWallClockMs = time.milliseconds(); -Utils.sleep(retryBackOffMs); +if (currentWallClockMs >= deadlineMs) { +final String timeoutError = String.format("Could not create topics within %d milliseconds. " + +"This can happen if the Kafka cluster is temporary not available.", retryTimeoutMs); +log.error(timeoutError); +// TODO: should we throw a different exception instead and catch it, to return a `INCOMPLETE_SOURCE_TOPIC_METADATA` error code Review comment: I'm thinking that moving forward we should try to not create internal topics during rebalance but try pre-create in starting, but for now assuming this is still the case I think letting the whole application to die is fine --- i.e. treat it the same as source topics. Hence I'm leaning towards encoding INCOMPLETE_SOURCE_TOPIC_METADATA to shutdown the whole app, across all clients. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #9069: KAFKA-5876: Apply UnknownStateStoreException for Interactive Queries
vitojeng commented on a change in pull request #9069: URL: https://github.com/apache/kafka/pull/9069#discussion_r459805054 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -693,6 +695,16 @@ public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing( streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0); } +@Test(expected = UnknownStateStoreException.class) Review comment: I am not sure if using annotation is a good choice. Would it be better to use `assertThrows` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17164063#comment-17164063 ] Guozhang Wang commented on KAFKA-10284: --- What [~ableegoldman] described seems aligned to this, BUT I thought that fenced instance Y should still try to rejoin the group, but in [~ableegoldman] that thread did not try to rejoin at all? > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10268) dynamic config like "--delete-config log.retention.ms" doesn't work
[ https://issues.apache.org/jira/browse/KAFKA-10268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx resolved KAFKA-10268. Fix Version/s: 2.7.0 Resolution: Fixed > dynamic config like "--delete-config log.retention.ms" doesn't work > --- > > Key: KAFKA-10268 > URL: https://issues.apache.org/jira/browse/KAFKA-10268 > Project: Kafka > Issue Type: Bug > Components: log, log cleaner >Affects Versions: 2.1.1 >Reporter: zhifeng.peng >Assignee: huxihx >Priority: Major > Fix For: 2.7.0 > > Attachments: server.log.2020-07-13-14 > > > After I set "log.retention.ms=301000" to clean the data,i use the cmd > "bin/kafka-configs.sh --bootstrap-server 10.129.104.15:9092 --entity-type > brokers --entity-default --alter --delete-config log.retention.ms" to reset > to default. > Static broker configuration like log.retention.hours is 168h and no topic > level configuration like retention.ms. > it did not take effect actually although server.log print the broker > configuration like that. > log.retention.check.interval.ms = 30 > log.retention.hours = 168 > log.retention.minutes = null > {color:#ff}log.retention.ms = null{color} > log.roll.hours = 168 > log.roll.jitter.hours = 0 > log.roll.jitter.ms = null > log.roll.ms = null > log.segment.bytes = 1073741824 > log.segment.delete.delay.ms = 6 > > Then we can see that retention time is still 301000ms from the server.log and > segments have been deleted. > [2020-07-13 14:30:00,958] INFO [Log partition=test_retention-2, > dir=/data/kafka_logs-test] Found deletable segments with base offsets > [5005329,6040360] due to retention time 301000ms breach (kafka.log.Log) > [2020-07-13 14:30:00,959] INFO [Log partition=test_retention-2, > dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 5005329, size > 1073741222] for deletion. (kafka.log.Log) > [2020-07-13 14:30:00,959] INFO [Log partition=test_retention-2, > dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 6040360, size > 1073728116] for deletion. (kafka.log.Log) > [2020-07-13 14:30:00,959] INFO [Log partition=test_retention-2, > dir=/data/kafka_logs-test] Incrementing log start offset to 7075648 > (kafka.log.Log) > [2020-07-13 14:30:00,960] INFO [Log partition=test_retention-0, > dir=/data/kafka_logs-test] Found deletable segments with base offsets > [5005330,6040410] {color:#FF}due to retention time 301000ms{color} breach > (kafka.log.Log) > [2020-07-13 14:30:00,960] INFO [Log partition=test_retention-0, > dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 5005330, size > 1073732368] for deletion. (kafka.log.Log) > [2020-07-13 14:30:00,961] INFO [Log partition=test_retention-0, > dir=/data/kafka_logs-test] Scheduling log segment [baseOffset 6040410, size > 1073735366] for deletion. (kafka.log.Log) > [2020-07-13 14:30:00,961] INFO [Log partition=test_retention-0, > dir=/data/kafka_logs-test] Incrementing log start offset to 7075685 > (kafka.log.Log) > [2020-07-13 14:31:00,959] INFO [Log partition=test_retention-2, > dir=/data/kafka_logs-test] Deleting segment 5005329 (kafka.log.Log) > [2020-07-13 14:31:00,959] INFO [Log partition=test_retention-2, > dir=/data/kafka_logs-test] Deleting segment 6040360 (kafka.log.Log) > [2020-07-13 14:31:00,961] INFO [Log partition=test_retention-0, > dir=/data/kafka_logs-test] Deleting segment 5005330 (kafka.log.Log) > [2020-07-13 14:31:00,961] INFO [Log partition=test_retention-0, > dir=/data/kafka_logs-test] Deleting segment 6040410 (kafka.log.Log) > [2020-07-13 14:31:01,144] INFO Deleted log > /data/kafka_logs-test/test_retention-2/06040360.log.deleted. > (kafka.log.LogSegment) > [2020-07-13 14:31:01,144] INFO Deleted offset index > /data/kafka_logs-test/test_retention-2/06040360.index.deleted. > (kafka.log.LogSegment) > [2020-07-13 14:31:01,144] INFO Deleted time index > /data/kafka_logs-test/test_retention-2/06040360.timeindex.deleted. > (kafka.log.LogSegment) > > Here are a few steps to reproduce it. > 1、set log.retention.ms=301000: > bin/kafka-configs.sh --bootstrap-server 10.129.104.15:9092 --entity-type > brokers --entity-default --alter --add-config log.retention.ms=301000 > 2、produce messages to the topic: > bin/kafka-producer-perf-test.sh --topic test_retention --num-records 1000 > --throughput -1 --producer-props bootstrap.servers=10.129.104.15:9092 > --record-size 1024 > 3、reset log.retention.ms to the default: > bin/kafka-configs.sh --bootstrap-server 10.129.104.15:9092 --entity-type > brokers --entity-default --alter --delete-config log.retention.ms > > I have attched server.log. You can see the log from row 238 to row 731. -- This message was sent by Atlassian Jira
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459790103 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set changelogTopics = new HashSet<>(); -for (final StateStore stateStore : globalStateStores) { + +long deadlineMs = NO_DEADLINE; +final List storesToInitialize = new LinkedList<>(globalStateStores); + +while (!storesToInitialize.isEmpty()) { +// we remove and add back on failure to round-robin through all stores +final StateStore stateStore = storesToInitialize.remove(0); globalStoreNames.add(stateStore.name()); final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); -stateStore.init(globalProcessorContext, stateStore); + +try { +stateStore.init(globalProcessorContext, stateStore); +deadlineMs = NO_DEADLINE; +} catch (final RetryableErrorException retryableException) { +if (taskTimeoutMs == 0L) { +throw new StreamsException(retryableException.getCause()); +} + +storesToInitialize.add(stateStore); + +final long currentWallClockMs = time.milliseconds(); +if (deadlineMs == NO_DEADLINE) { +final long newDeadlineMs = currentWallClockMs + taskTimeoutMs; +deadlineMs = newDeadlineMs < 0L ? Long.MAX_VALUE : newDeadlineMs; +} else if (currentWallClockMs > deadlineMs) { +throw new TimeoutException(String.format( +"Global task did not make progress to restore state within %d ms. Adjust `task.timeout.ms` if needed.", +currentWallClockMs - deadlineMs + taskTimeoutMs +)); +} + +log.debug(retryableException.getMessage() + " Will retry. Remaining time in milliseconds: {}", deadlineMs - currentWallClockMs); Review comment: I understood what you meant, but I thought that because we actual do retry, it's better/cleaner/less-noisy to not log the full stack trace? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459790323 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { Review comment: > I can't comment above, but can the poll call itself throw a timeout exception? No, it cannot. It would just return an empty result. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459791370 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately Review comment: I guess strictly speaking, it's not necessary, but I personally prefer this strict style as it surfaces bugs quicker. If we write "robust" code, it could hide a bug. -- It's just a personal preference and we have similar code elsewhere. If you feel very strong about it, I am also ok to change it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459791741 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately + +// hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` +throw new IllegalStateException(error); +} + stateRestoreAdapter.restoreBatch(restoreRecords); stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); restoreCount += restoreRecords.size(); Review comment: Yes, it would block forever today. Curious to hear what @guozhangwang thinks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
huxihx commented on pull request #9051: URL: https://github.com/apache/kafka/pull/9051#issuecomment-663309863 @rajinisivaram Thanks for the review, merging to trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx merged pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
huxihx merged pull request #9051: URL: https://github.com/apache/kafka/pull/9051 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon commented on a change in pull request #9029: URL: https://github.com/apache/kafka/pull/9029#discussion_r459818087 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -62,6 +65,7 @@ private static final int NUM_PARTITIONS = 10; private static final int RECORD_TRANSFER_DURATION_MS = 20_000; private static final int CHECKPOINT_DURATION_MS = 20_000; +private static final int OFFSET_SYNC_DURATION_MS = 30_000; private Time time = Time.SYSTEM; Review comment: Nice catch! I also removed the unused import `import org.apache.kafka.common.utils.Time;` Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] piddubnyi commented on pull request #9017: KAFKA-8582 [WIP] Add ability to handle late messages in streams-aggregation
piddubnyi commented on pull request #9017: URL: https://github.com/apache/kafka/pull/9017#issuecomment-663037679 Hi @mjsax, I commented the ticket to get it assigned. Regarding the KIP, I just created a fresh account in Confluence, however, it looks like there are no rights to trigger the KIP. Please suggest the best way to proceed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
mimaison commented on a change in pull request #9029: URL: https://github.com/apache/kafka/pull/9029#discussion_r459460614 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -62,6 +65,7 @@ private static final int NUM_PARTITIONS = 10; private static final int RECORD_TRANSFER_DURATION_MS = 20_000; private static final int CHECKPOINT_DURATION_MS = 20_000; +private static final int OFFSET_SYNC_DURATION_MS = 30_000; private Time time = Time.SYSTEM; Review comment: We can remove this field now that it's unused ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException { backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count()); } +private void waitForConsumerGroupOffsetSync(Consumer consumer, List topics, String consumerGroupId) +throws InterruptedException { +Admin backupClient = backup.kafka().createAdminClient(); +List tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); +IntStream.range(0, NUM_PARTITIONS).forEach( Review comment: I'm not sure this is much better than a simple `for` loop. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r459536318 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ## @@ -132,7 +132,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) -.filter(x -> x.downstreamOffset() > 0) // ignore offsets we cannot translate accurately +.filter(x -> x.downstreamOffset() >= 0) // ignore offsets we cannot translate accurately Review comment: @ryannedolan @heritamas If you guys think this is safe enough I can remove that filter. But it doesn't hurt to leave it there just in case a rogue negative offset comes through for whatever reason/bug... Please let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459563205 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { -remoteReplicasMap.clear() +val replicaSet = assignment.toSet +val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - +removedReplicas.foreach(remoteReplicasMap.remove) Review comment: Would `remoteReplicasMap --= removedReplicas` work here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies
mjsax commented on a change in pull request #9064: URL: https://github.com/apache/kafka/pull/9064#discussion_r459581717 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ## @@ -49,6 +49,9 @@ /** * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology. * + * It is a requirement that the processing logic (Topology) be defined in a deterministic way. Review comment: Nit: insert `` tag to actually get the new paragraph rendered. Nit: `Topology -> `{@link Topology}` It's not really clear what "deterministic" means. We should elaborate more. ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ## @@ -49,6 +49,9 @@ /** * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology. * + * It is a requirement that the processing logic (Topology) be defined in a deterministic way. + * If different instances build different runtime code logic the resulting behavior may be unexpected. Review comment: "different" for sure, but this implies that one might have an operator the other does not. The observed issue is, that even if both contain the same operator, they might be added in different order (and thus be named differently) to the `Topology`, thus we should stretch that order matters. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -1984,6 +1985,24 @@ public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() { task.initializeStateStores(); } +@Test(expected = TopologyException.class) Review comment: We should not use this annotation but rather use `assertThrows` (we still have some code that does not use `assertThrows` but we try to lazily migrate our tests, as it provides a better test pattern). ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -181,8 +182,16 @@ public StreamTask(final TaskId id, final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor(); final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler(); +final Set sourceTopics = topology.sourceTopics(); for (final TopicPartition partition : partitions) { -final SourceNode source = topology.source(partition.topic()); +final String topicName = partition.topic(); +if (!sourceTopics.contains(topicName)) { +throw new TopologyException( +"Topic not found " + topicName + ". Is the Streams Topology built in a deterministic way?" Review comment: `Topic not found` sounds like as-if the topic was not found in the cluster -- however, what actually happened is that we received a record but the record's topic is unknown in the sub-topology. Similar to above, "deterministic" is not really easy to understand. I would also not phrase it as a question, but as a statement: ``` ... This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order. ``` Or similar. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163759#comment-17163759 ] Guozhang Wang commented on KAFKA-10134: --- [~zhowei] Did your run include both the log4j improvement and the other PR depending on fetchable partitions to do long polling? > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions
[ https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-10301: Description: In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira https://issues.apache.org/jira/browse/KAFKA-10302 tracks that. was: In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira X tracks that. > Partition#remoteReplicasMap can be empty in certain race conditions > --- > > Key: KAFKA-10301 > URL: https://issues.apache.org/jira/browse/KAFKA-10301 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Blocker > > In Partition#updateAssignmentAndIsr, we would previously update the > `partition#remoteReplicasMap` by adding the new replicas to the map and then > removing the old ones > ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] > During a recent refactoring, we changed it to first clear the map and then > add all the replicas to it > ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) > While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not > all callers that access the map structure use a lock. Some examples: > - Partition#updateFollowerFetchState > - DelayedDeleteRecords#tryComplete > - Partition#getReplicaOrException - called in > `checkEnoughReplicasReachOffset` without a lock, which itself is called by > DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. > While we want to polish the code to ensure these sort of race conditions > become harder (or impossible) to introduce, it sounds safest to revert to the > previous behavior given the timelines regarding the 2.6 release. Jira > https://issues.apache.org/jira/browse/KAFKA-10302 tracks that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10302) Ensure thread-safe access to Partition#remoteReplicasMap
Stanislav Kozlovski created KAFKA-10302: --- Summary: Ensure thread-safe access to Partition#remoteReplicasMap Key: KAFKA-10302 URL: https://issues.apache.org/jira/browse/KAFKA-10302 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski A recent Jira (https://issues.apache.org/jira/browse/KAFKA-10301) exposed how easy it is to introduce nasty race conditions with the Partition#remoteReplicasMap data structure. It is a concurrent map which is modified inside a write lock but it is not always accessed through that lock. Therefore it's possible for callers to access an intermediate state of the map, for instance in between updating the replica assignment for a given partition. It would be good to ensure thread-safe access to the data structure in a way which makes it harder to introduce such regressions in the future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski opened a new pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski opened a new pull request #9065: URL: https://github.com/apache/kafka/pull/9065 We would previously update the map by adding the new replicas to the map and then removing the old ones. During a recent refactoring, we changed the logic to first clear the map and then add all the replicas to it. While this is done in a write lock, not all callers that access the map structure use a lock. It is safer to revert to the previous behavior of showing the intermediate state of the map with extra replicas, rather than an intermediate state of the map with no replicas. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663069630 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663088134 Currently working on introducing a test case for this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
mjsax commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r459577643 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); -int remainingRetries = retries; +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; Review comment: Good question. Default `max.poll.interval.ms` is 5 minutes (ie, the deadline is set to 2.5 minutes by default) while default `api.default.timeout.ms` is 1 minutes? Thus we might be ok? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-663109443 @chia7712: Only 6 test failures in the latest run with your PR. http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-23--001.1595503536--chia7712--fix_8334_avoid_deadlock--3462b0008/report.html I will do another run on trunk for comparison. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10301) RemoteReplicasMap can be empty in certain race conditions
Stanislav Kozlovski created KAFKA-10301: --- Summary: RemoteReplicasMap can be empty in certain race conditions Key: KAFKA-10301 URL: https://issues.apache.org/jira/browse/KAFKA-10301 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira X tracks that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions
[ https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-10301: Summary: Partition#remoteReplicasMap can be empty in certain race conditions (was: RemoteReplicasMap can be empty in certain race conditions) > Partition#remoteReplicasMap can be empty in certain race conditions > --- > > Key: KAFKA-10301 > URL: https://issues.apache.org/jira/browse/KAFKA-10301 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Blocker > > In Partition#updateAssignmentAndIsr, we would previously update the > `partition#remoteReplicasMap` by adding the new replicas to the map and then > removing the old ones > ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] > During a recent refactoring, we changed it to first clear the map and then > add all the replicas to it > ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) > While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not > all callers that access the map structure use a lock. Some examples: > - Partition#updateFollowerFetchState > - DelayedDeleteRecords#tryComplete > - Partition#getReplicaOrException - called in > `checkEnoughReplicasReachOffset` without a lock, which itself is called by > DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. > While we want to polish the code to ensure these sort of race conditions > become harder (or impossible) to introduce, it sounds safest to revert to the > previous behavior given the timelines regarding the 2.6 release. Jira X > tracks that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459527097 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { -remoteReplicasMap.clear() +val replicaSet = assignment.toSet +val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - +removedReplicas.foreach(remoteReplicasMap.remove) Review comment: I decided to not get fancy with refactorings - this is literally the old code (https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions
[ https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163686#comment-17163686 ] Stanislav Kozlovski commented on KAFKA-10301: - cc [~rhauch] - this would be good to get in 2.6 > Partition#remoteReplicasMap can be empty in certain race conditions > --- > > Key: KAFKA-10301 > URL: https://issues.apache.org/jira/browse/KAFKA-10301 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Blocker > > In Partition#updateAssignmentAndIsr, we would previously update the > `partition#remoteReplicasMap` by adding the new replicas to the map and then > removing the old ones > ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] > During a recent refactoring, we changed it to first clear the map and then > add all the replicas to it > ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) > While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not > all callers that access the map structure use a lock. Some examples: > - Partition#updateFollowerFetchState > - DelayedDeleteRecords#tryComplete > - Partition#getReplicaOrException - called in > `checkEnoughReplicasReachOffset` without a lock, which itself is called by > DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. > While we want to polish the code to ensure these sort of race conditions > become harder (or impossible) to introduce, it sounds safest to revert to the > previous behavior given the timelines regarding the 2.6 release. Jira > https://issues.apache.org/jira/browse/KAFKA-10302 tracks further > modifications to the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()
mimaison commented on a change in pull request #9007: URL: https://github.com/apache/kafka/pull/9007#discussion_r459540733 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) { return new DescribeLogDirsResult(new HashMap<>(futures)); } +private Map logDirDescriptions(DescribeLogDirsResponse response) { Review comment: This can be static. Also should we keep it in `DescribeLogDirsResponse`? ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception { } } +private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { +return prepareDescribeLogDirsResponse(error, logDir, +prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false)); +} + +private List prepareDescribeLogDirsTopics( +long partitionSize, long offsetLag, String topic, int partition, boolean isFuture) { +return singletonList(new DescribeLogDirsTopic() +.setName(topic) +.setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() +.setPartitionIndex(partition) +.setPartitionSize(partitionSize) +.setIsFutureKey(isFuture) +.setOffsetLag(offsetLag; +} + +private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, + List topics) { +return new DescribeLogDirsResponse( +new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() +.setErrorCode(error.code()) +.setLogDir(logDir) +.setTopics(topics) +))); +} + +@Test +public void testDescribeLogDirs() throws ExecutionException, InterruptedException { +Set brokers = Collections.singleton(0); +String logDir = "/var/data/kafka"; +TopicPartition tp = new TopicPartition("topic", 12); +long partitionSize = 1234567890; +long offsetLag = 24; + +try (AdminClientUnitTestEnv env = mockClientEnv()) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponseFrom( +prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, partitionSize, offsetLag), +env.cluster().nodeById(0)); + +DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + +Map>> descriptions = result.descriptions(); +assertEquals(brokers, descriptions.keySet()); +assertNotNull(descriptions.get(0)); +assertDescriptionContains(descriptions.get(0).get(), logDir, tp, partitionSize, offsetLag); + +Map> allDescriptions = result.allDescriptions().get(); +assertEquals(brokers, allDescriptions.keySet()); +assertDescriptionContains(allDescriptions.get(0), logDir, tp, partitionSize, offsetLag); +} +} + +private void assertDescriptionContains(Map descriptionsMap, String logDir, Review comment: This can be static ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception { } } +private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { Review comment: This can be static ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception { } } +private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { +return prepareDescribeLogDirsResponse(error, logDir, +prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false)); +} + +private List prepareDescribeLogDirsTopics( Review comment: This can be static ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception { } } +private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r459499911 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.network.ByteBufferSend; +import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +/** + * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer + * of data from a record-set's file channel to the eventual socket channel. + * + * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array + * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written + * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes, + * another Send is passed to the consumer which wraps the underlying record-set's transfer logic. + * + * For example, + * + * + * recordsWritable.writeInt(10); + * recordsWritable.writeRecords(records1); + * recordsWritable.writeInt(20); + * recordsWritable.writeRecords(records2); + * recordsWritable.writeInt(30); + * recordsWritable.flush(); + * + * + * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any + * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is + * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}. + * + * @see org.apache.kafka.common.requests.FetchResponse + */ +public class RecordsWriter implements Writable { +private final String dest; +private final Consumer sendConsumer; +private final ByteBufferOutputStream byteArrayOutputStream; +private final DataOutput output; +private int mark; + +public RecordsWriter(String dest, Consumer sendConsumer) { +this.dest = dest; +this.sendConsumer = sendConsumer; +this.byteArrayOutputStream = new ByteBufferOutputStream(32); +this.output = new DataOutputStream(this.byteArrayOutputStream); +this.mark = 0; +} + +@Override +public void writeByte(byte val) { +try { +output.writeByte(val); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeShort(short val) { +try { +output.writeShort(val); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeInt(int val) { +try { +output.writeInt(val); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeLong(long val) { +try { +output.writeLong(val); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeDouble(double val) { +try { +ByteUtils.writeDouble(val, output); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeByteArray(byte[] arr) { +try { +output.write(arr); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeUnsignedVarint(int i) { +
[GitHub] [kafka] AshishRoyJava commented on pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE
AshishRoyJava commented on pull request #9034: URL: https://github.com/apache/kafka/pull/9034#issuecomment-663066844 @abbccdda Unit test added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
dajac commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r459582710 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); -int remainingRetries = retries; +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; Review comment: That's right. I misread the default value of `max.poll.interval.ms`, too many zeros for my eyes ;). The default works fine then. Do we want to protect ourselves if the user changes the default? Or shall we just call out that `api.default.timeout.ms` should be lower than `max.poll.interval.ms` somewhere? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions
[ https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-10301: Description: In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira https://issues.apache.org/jira/browse/KAFKA-10302 tracks further modifications to the code. was: In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira https://issues.apache.org/jira/browse/KAFKA-10302 tracks that. > Partition#remoteReplicasMap can be empty in certain race conditions > --- > > Key: KAFKA-10301 > URL: https://issues.apache.org/jira/browse/KAFKA-10301 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Blocker > > In Partition#updateAssignmentAndIsr, we would previously update the > `partition#remoteReplicasMap` by adding the new replicas to the map and then > removing the old ones > ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] > During a recent refactoring, we changed it to first clear the map and then > add all the replicas to it > ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) > While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not > all callers that access the map structure use a lock. Some examples: > - Partition#updateFollowerFetchState > - DelayedDeleteRecords#tryComplete > - Partition#getReplicaOrException - called in > `checkEnoughReplicasReachOffset` without a lock, which itself is called by > DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. > While we want to polish the code to ensure these sort of race conditions > become harder (or impossible) to introduce, it sounds safest to revert to the > previous behavior given the timelines regarding the 2.6 release. Jira > https://issues.apache.org/jira/browse/KAFKA-10302 tracks further > modifications to the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663065197 cc @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10303) kafka producer says connect failed in cluster mode
Yogesh BG created KAFKA-10303: - Summary: kafka producer says connect failed in cluster mode Key: KAFKA-10303 URL: https://issues.apache.org/jira/browse/KAFKA-10303 Project: Kafka Issue Type: Bug Reporter: Yogesh BG Hi I am using kafka broker version 2.3.0 We have two setups with standalone(one node) and 3 nodes cluster we pump huge data ~25MBPS, ~80K messages per second It all works well in one node mode but in case of cluster, producer start throwing connect failed(librd kafka) after sometime again able to connect start sending traffic. What could be the issue? some of the configurations are replica.fetch.max.bytes=10485760 num.network.threads=12 num.replica.fetchers=6 queued.max.requests=5 # The number of threads doing disk I/O num.io.threads=12 replica.socket.receive.buffer.bytes=1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
mimaison commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-663094946 While not part of your changes, I noticed the tests assumptions are pretty loose. For example, we assume https://github.com/apache/kafka/pull/9029/files#diff-a03d58195cfe119d0b1ed2693cd0d691L362 always consume all the 100 messages. The test also assumes there are no duplicates. While this may be fine when running in memory, Connect semantics are at least once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters
mjsax commented on pull request #9052: URL: https://github.com/apache/kafka/pull/9052#issuecomment-663107263 Only flaky tests failed. I updated the corresponding Jira tickets. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163749#comment-17163749 ] Matthias J. Sax commented on KAFKA-9013: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7491/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/] > Flaky Test MirrorConnectorsIntegrationTest#testReplication > -- > > Key: KAFKA-9013 > URL: https://issues.apache.org/jira/browse/KAFKA-9013 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > > h1. Stacktrace: > {code:java} > java.lang.AssertionError: Condition not met within timeout 2. Offsets not > translated downstream to primary cluster. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239) > {code} > h1. Standard Error > {code} > Standard Error > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.RootResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. > Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors > WARNING: The following warnings have been detected: WARNING: The > (sub)resource method listLoggers in > org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectors in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method createConnector in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectorPlugins in > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > contains empty path annotation. > WARNING: The (sub)resource method serverInfo in > org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty > path annotation. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Oct 09, 2019 11:32:02 PM
[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
[ https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163747#comment-17163747 ] Matthias J. Sax commented on KAFKA-10255: - Two more: * [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1630/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/] * [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3480/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/] > Fix flaky testOneWayReplicationWithAutorOffsetSync1 test > > > Key: KAFKA-10255 > URL: https://issues.apache.org/jira/browse/KAFKA-10255 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0 > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 STARTED > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 FAILED > java.lang.AssertionError: consumer record size is not zero expected:<0> but > was:<2> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 edited a comment on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-662810186 > Could you rebase again? I will run the system tests after that. Thanks. done. the known flaky ```group_mode_transactions_test.py``` is traced by #9059 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress
[ https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163770#comment-17163770 ] Matthias J. Sax commented on KAFKA-8582: Added you to the list of contributors and assigned the ticket to you. You can now also self-assign tickets. > Consider adding an ExpiredWindowRecordHandler to Suppress > - > > Key: KAFKA-8582 > URL: https://issues.apache.org/jira/browse/KAFKA-8582 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Igor Piddubnyi >Priority: Major > > I got some feedback on Suppress: > {quote}Specifying how to handle events outside the grace period does seem > like a business concern, and simply discarding them thus seems risky (for > example imagine any situation where money is involved). > This sort of situation is addressed by the late-triggering approach > associated with watermarks > (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given > this I wondered if you were considering adding anything similar?{quote} > It seems like, if a record has arrived past the grace period for its window, > then the state of the windowed aggregation would already have been lost, so > if we were to compute an aggregation result, it would be incorrect. Plus, > since the window is already expired, we can't store the new (incorrect, but > more importantly expired) aggregation result either, so any subsequent > super-late records would also face the same blank-slate. I think this would > wind up looking like this: if you have three timely records for a window, and > then three more that arrive after the grace period, and you were doing a > count aggregation, you'd see the counts emitted for the window as [1, 2, 3, > 1, 1, 1]. I guess we could add a flag to the post-expiration results to > indicate that they're broken, but this seems like the wrong approach. The > post-expiration aggregation _results_ are meaningless, but I could see > wanting to send the past-expiration _input records_ to a dead-letter queue or > something instead of dropping them. > Along this line of thinking, I wonder if we should add an optional > past-expiration record handler interface to the suppression operator. Then, > you could define your own logic, whether it's a dead-letter queue, sending it > to some alerting pipeline, or even just crashing the application before it > can do something wrong. This would be a similar pattern to how we allow > custom logic to handle deserialization errors by supplying a > org.apache.kafka.streams.errors.DeserializationExceptionHandler. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress
[ https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-8582: -- Assignee: Igor Piddubnyi > Consider adding an ExpiredWindowRecordHandler to Suppress > - > > Key: KAFKA-8582 > URL: https://issues.apache.org/jira/browse/KAFKA-8582 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Igor Piddubnyi >Priority: Major > > I got some feedback on Suppress: > {quote}Specifying how to handle events outside the grace period does seem > like a business concern, and simply discarding them thus seems risky (for > example imagine any situation where money is involved). > This sort of situation is addressed by the late-triggering approach > associated with watermarks > (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given > this I wondered if you were considering adding anything similar?{quote} > It seems like, if a record has arrived past the grace period for its window, > then the state of the windowed aggregation would already have been lost, so > if we were to compute an aggregation result, it would be incorrect. Plus, > since the window is already expired, we can't store the new (incorrect, but > more importantly expired) aggregation result either, so any subsequent > super-late records would also face the same blank-slate. I think this would > wind up looking like this: if you have three timely records for a window, and > then three more that arrive after the grace period, and you were doing a > count aggregation, you'd see the counts emitted for the window as [1, 2, 3, > 1, 1, 1]. I guess we could add a flag to the post-expiration results to > indicate that they're broken, but this seems like the wrong approach. The > post-expiration aggregation _results_ are meaningless, but I could see > wanting to send the past-expiration _input records_ to a dead-letter queue or > something instead of dropping them. > Along this line of thinking, I wonder if we should add an optional > past-expiration record handler interface to the suppression operator. Then, > you could define your own logic, whether it's a dead-letter queue, sending it > to some alerting pipeline, or even just crashing the application before it > can do something wrong. This would be a similar pattern to how we allow > custom logic to handle deserialization errors by supplying a > org.apache.kafka.streams.errors.DeserializationExceptionHandler. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163796#comment-17163796 ] Sophie Blee-Goldman commented on KAFKA-10284: - You know, I think we actually hit this too, but weren't able to recognize the problem at the time. A few weeks ago one of our StreamThreads/Consumers seemed to "take off" from the group at some point, as evidenced by the steadily increasing last-rebalance-seconds-ago metric (whereas the other members had rebalanced multiple times since then). Right before this occurred we saw that same error message in the logs: {code:java} ERROR given member.id X is identified as a known static member 1,but not matching the expected member.id Y (kafka.coordinator.group.GroupMetadata) {code} Unfortunately we killed the client trying to remotely debug it so we couldn't get any more useful information. Would you say that this was mysterious encounter was likely due to the bug reported here? [~guozhang] [~bchen225242] > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163871#comment-17163871 ] Boyang Chen commented on KAFKA-10284: - [~akshaysh] I didn't see any trace that the group coordinator gets migrated in the pasted ticket, so it might be a separate issue. [~ableegoldman] Well, the symptom matches, but I don't know for sure if this is the same cause :) > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9161) Close gaps in Streams configs documentation
[ https://issues.apache.org/jira/browse/KAFKA-9161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9161: --- Fix Version/s: 2.6.0 > Close gaps in Streams configs documentation > --- > > Key: KAFKA-9161 > URL: https://issues.apache.org/jira/browse/KAFKA-9161 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Labels: beginner, newbie, newbie++ > Fix For: 2.6.0 > > > There are a number of Streams configs that aren't documented in the > "Configuring a Streams Application" section of the docs. As of 2.3 the > missing configs are: > # default.windowed.key.serde.inner ^ > # default.windowed.value.serde.inner ^ > # max.task.idle.ms > # rocksdb.config.setter. ^^ > # topology.optimization > # -upgrade.from- fixed > ^ these configs are also missing the corresponding DOC string > ^^ this one actually does appear on that page, but instead of being included > in the list of Streams configs it is for some reason under "Consumer and > Producer Configuration Parameters" ? > There are also a few configs whose documented name is slightly incorrect, as > it is missing the "default" prefix that appears in the actual code. The > "missing-default" configs are: > # key.serde > # value.serde > # timestamp.extractor -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459588501 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { -remoteReplicasMap.clear() +val replicaSet = assignment.toSet +val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - +removedReplicas.foreach(remoteReplicasMap.remove) Review comment: Oh, this is a `Pool`, so we would have to add a `removeAll` method. Seems easy enough though since it can call the relevant method in `ConcurrentMap`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163761#comment-17163761 ] Guozhang Wang commented on KAFKA-10134: --- BTW I found that the main latency during rebalance is on discovering the coordinator while we keep getting "Join group failed with org.apache.kafka.common.errors.DisconnectException" and it kept retrying for about a minute. But I think you did not shutdown the broker in your experiment, is there anything else happening that cause that broker node to be not reachable? > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies
mjsax commented on pull request #9064: URL: https://github.com/apache/kafka/pull/9064#issuecomment-663113463 Btw: The PR should be against `trunk`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9017: KAFKA-8582 [WIP] Add ability to handle late messages in streams-aggregation
mjsax commented on pull request #9017: URL: https://github.com/apache/kafka/pull/9017#issuecomment-663122659 I found your wiki account and granted write access. You should be all set. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10000) Atomic commit of source connector records and offsets
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163802#comment-17163802 ] Ning Zhang commented on KAFKA-1: Hi Chris, the purpose of this ticket is very interesting. I wonder what is the priority in the overall Kafka Connect backlog, or how is the progress so far (needs-KIP)? Thanks > Atomic commit of source connector records and offsets > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Labels: needs-kip > > It'd be nice to be able to configure source connectors such that their > offsets are committed if and only if all records up to that point have been > ack'd by the producer. This would go a long way towards EOS for source > connectors. > > This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is > marked as {{WONTFIX}} since it only concerns enabling the idempotent producer > for source connectors and is not concerned with source connector offsets. > This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, > which had a lot of discussion around allowing connector-defined transaction > boundaries. The suggestion in this ticket is to only use source connector > offset commits as the transaction boundaries for connectors; allowing > connector-specified transaction boundaries can be addressed separately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
mjsax commented on pull request #9060: URL: https://github.com/apache/kafka/pull/9060#issuecomment-663135369 Jenkins failed on know flaky tests only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose
[ https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163803#comment-17163803 ] Matthias J. Sax commented on KAFKA-7540: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3496/testReport/junit/kafka.api/ConsumerBounceTest/testClose/] {quote}java.lang.AssertionError: Assignment did not complete on time at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.assertTrue(Assert.java:42) at kafka.api.ConsumerBounceTest.checkClosedState(ConsumerBounceTest.scala:486) at kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:257) at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:220){quote} > Flaky Test ConsumerBounceTest#testClose > --- > > Key: KAFKA-7540 > URL: https://issues.apache.org/jira/browse/KAFKA-7540 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 2.2.0 >Reporter: John Roesler >Assignee: Jason Gustafson >Priority: Critical > Labels: flaky-test > Fix For: 2.7.0, 2.6.1 > > > Observed on Java 8: > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/] > > Stacktrace: > {noformat} > java.lang.ArrayIndexOutOfBoundsException: -1 > at > kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146) > at > kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238) > at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at >
[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
[ https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163804#comment-17163804 ] Matthias J. Sax commented on KAFKA-10255: - And one more: [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1645/] > Fix flaky testOneWayReplicationWithAutorOffsetSync1 test > > > Key: KAFKA-10255 > URL: https://issues.apache.org/jira/browse/KAFKA-10255 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0 > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 STARTED > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 FAILED > java.lang.AssertionError: consumer record size is not zero expected:<0> but > was:<2> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] soondenana commented on a change in pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log
soondenana commented on a change in pull request #9054: URL: https://github.com/apache/kafka/pull/9054#discussion_r459603473 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -1151,6 +1144,12 @@ class LogManager(logDirs: Seq[File], } } } + + private def removeLogAndMetrics(logs: Pool[TopicPartition, Log], tp: TopicPartition): Log = { +val removedLog = logs.remove(tp) +if (removedLog != null) removedLog.removeLogMetrics() +removedLog Review comment: nit: Lets return Option(removedLog) to ease null checking by clients. Seems like the same object gets returned by `asyncDelete` but is only used in one place in test code, so many want to change the return value of that too. The less "null" we have the better. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -199,27 +199,22 @@ class LogManager(logDirs: Seq[File], if (cleaner != null) cleaner.handleLogDirFailure(dir) - val offlineCurrentTopicPartitions = currentLogs.collect { -case (tp, log) if log.parentDir == dir => tp - } - offlineCurrentTopicPartitions.foreach { topicPartition => { -val removedLog = currentLogs.remove(topicPartition) -if (removedLog != null) { - removedLog.closeHandlers() - removedLog.removeLogMetrics() + def removeOfflineLogs(logs: Pool[TopicPartition, Log]): Iterable[TopicPartition] = { Review comment: Thanks for deduping this code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()
abbccdda commented on a change in pull request #8512: URL: https://github.com/apache/kafka/pull/8512#discussion_r459648674 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -1388,9 +1388,9 @@ public void commitSync() { */ @Override public void commitSync(Duration timeout) { +maybeThrowInvalidGroupIdException(); Review comment: If we throw here, we will not execute the current `finally` block to call `release()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459596274 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { -remoteReplicasMap.clear() +val replicaSet = assignment.toSet +val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - +removedReplicas.foreach(remoteReplicasMap.remove) Review comment: `remoteReplicasMap --= removedReplicas` doesn't compile - the `remoteReplicasMap` is using a Kafka `Pool` class which itself is using a Java Map and I don't think they support the `--=` notation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459599939 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { -remoteReplicasMap.clear() +val replicaSet = assignment.toSet +val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - +removedReplicas.foreach(remoteReplicasMap.remove) Review comment: Sounds good to introduce the method! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
mjsax commented on pull request #9060: URL: https://github.com/apache/kafka/pull/9060#issuecomment-663134416 Only the `StreamsStandbyTask.test_standby_tasks_rebalance` system test failed and it's know to be buggy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
ning2008wisc commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-663147296 totally agree that it may be better to revisit the tests in MM2 and I created a ticket https://issues.apache.org/jira/browse/KAFKA-10304 and assign it to myself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
Ning Zhang created KAFKA-10304: -- Summary: Revisit and improve the tests of MirrorMaker 2 Key: KAFKA-10304 URL: https://issues.apache.org/jira/browse/KAFKA-10304 Project: Kafka Issue Type: Test Components: KafkaConnect Reporter: Ning Zhang Assignee: Ning Zhang due to the quick development of Kafka MM 2, unit and integration tests of MirrorMaker 2 were made just for covering each individual feature and some of them are simply copy-n-paste from the existing tests with small tweaks. It may be a good time to revisit and improve the tests, possibly in the following way: (1) are 100 messages good enough for integration tests? (2) what about the failure in the middle of integration tests? (3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset sync..) beyond the mirrored message in integration tests? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459645411 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { +val active = new AtomicBoolean(true) +val replicaToCheck = 3 +val firstReplicaSet = Seq[Integer](3, 4, 5).asJava +val secondReplicaSet = Seq[Integer](1, 2, 3).asJava +def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() + .setControllerEpoch(1) + .setLeader(replicas.get(0)) + .setLeaderEpoch(1) + .setIsr(replicas) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true) +val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) +// Update replica set synchronously first to avoid race conditions +partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints) +assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + +var i = 0 +val future = executorService.submit((() => { + // Flip assignment between two replica sets + while (active.get) { +val replicas = if (i % 2 == 0) { + firstReplicaSet +} else { + secondReplicaSet +} + +partition.makeLeader(partitionState(replicas), offsetCheckpoints) + +i += 1 +Thread.sleep(1) // just to avoid tight loop + } +}): Runnable) + +val deadline = 5.seconds.fromNow Review comment: Lowered to 1s This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
jeffkbkim commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r459647273 ## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ## @@ -598,6 +603,86 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { }, "Broker fail to initialize after restart") } + @Test + def testPreemptionOnControllerShutdown(): Unit = { +servers = makeServers(1, enableControlledShutdown = false) +val controller = getController().kafkaController +val count = new AtomicInteger(2) +val latch = new CountDownLatch(1) +val spyThread = spy(controller.eventManager.thread) +controller.eventManager.setControllerEventThread(spyThread) +val processedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = latch.await() + override def preempt(): Unit = {} +} +val preemptedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = {} + override def preempt(): Unit = count.decrementAndGet() +} + +controller.eventManager.put(processedEvent) +controller.eventManager.put(preemptedEvent) +controller.eventManager.put(preemptedEvent) + +doAnswer((_: InvocationOnMock) => { + latch.countDown() +}).doCallRealMethod().when(spyThread).awaitShutdown() Review comment: @stanislavkozlovski thanks for the comment. i've tried this approach before and the test passes but sometimes output: ``` [2020-07-23 11:12:43,316] ERROR [RequestSendThread controllerId=0] Controller 0 fails to send a request to broker localhost:51542 (id: 0 rack: null) (kafka.controller.RequestSendThread:76) java.lang.InterruptedException ``` i also see a bit of flakiness with this approach as we cannot exactly time when `latch.countDown()` is called. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459598439 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { Review comment: This fails incredibly quickly 100/100 times without the Partition.scala changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663130390 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459607476 ## File path: core/src/test/scala/unit/kafka/utils/PoolTest.scala ## @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.utils Review comment: Remove `unit`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
[ https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163807#comment-17163807 ] Matthias J. Sax commented on KAFKA-10255: - [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/] > Fix flaky testOneWayReplicationWithAutorOffsetSync1 test > > > Key: KAFKA-10255 > URL: https://issues.apache.org/jira/browse/KAFKA-10255 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0 > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 STARTED > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 FAILED > java.lang.AssertionError: consumer record size is not zero expected:<0> but > was:<2> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on pull request #9047: URL: https://github.com/apache/kafka/pull/9047#issuecomment-663136110 Flaky tests only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459616154 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { +val active = new AtomicBoolean(true) +val replicaToCheck = 3 +val firstReplicaSet = Seq[Integer](3, 4, 5).asJava +val secondReplicaSet = Seq[Integer](1, 2, 3).asJava +def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() + .setControllerEpoch(1) + .setLeader(replicas.get(0)) + .setLeaderEpoch(1) + .setIsr(replicas) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true) +val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) +// Update replica set synchronously first to avoid race conditions +partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints) +assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + +var i = 0 Review comment: Shouldn't this be inside the thread state? ## File path: core/src/main/scala/kafka/utils/Pool.scala ## @@ -69,6 +69,8 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { def remove(key: K, value: V): Boolean = pool.remove(key, value) + def removeAll(keys: Iterable[K]): Unit = pool.keySet().removeAll(keys.asJavaCollection) Review comment: Nit: `()` is not needed. ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { +val active = new AtomicBoolean(true) +val replicaToCheck = 3 +val firstReplicaSet = Seq[Integer](3, 4, 5).asJava +val secondReplicaSet = Seq[Integer](1, 2, 3).asJava +def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() Review comment: No need to repeat `LeaderAndIsrPartitionState` twice. ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { +val active = new AtomicBoolean(true) +val replicaToCheck = 3 +val firstReplicaSet = Seq[Integer](3, 4, 5).asJava +val secondReplicaSet = Seq[Integer](1, 2, 3).asJava +def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() + .setControllerEpoch(1) + .setLeader(replicas.get(0)) + .setLeaderEpoch(1) + .setIsr(replicas) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true) +val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) +// Update replica set synchronously first to avoid race conditions +partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints) +assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + +var i = 0 +val future = executorService.submit((() => { + // Flip assignment between two replica sets + while (active.get) { +val replicas = if (i % 2 == 0) { + firstReplicaSet +} else { + secondReplicaSet +} + +partition.makeLeader(partitionState(replicas), offsetCheckpoints) + +i += 1 +Thread.sleep(1) // just to avoid tight loop + } +}): Runnable) + +val deadline = 5.seconds.fromNow +while(deadline.hasTimeLeft()) { Review comment: Nit: space missing after `while`. ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala