[jira] [Updated] (KAFKA-12454) Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
[ https://issues.apache.org/jira/browse/KAFKA-12454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbing Shen updated KAFKA-12454: - Affects Version/s: 2.8.0 > Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in > current kafka cluster > --- > > Key: KAFKA-12454 > URL: https://issues.apache.org/jira/browse/KAFKA-12454 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Minor > > When non-existent brokerIds value are given, the kafka-log-dirs tool will > have a timeout error: > Exception in thread "main" java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: describeLogDirs > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at kafka.admin.LogDirsCommand$.describe(LogDirsCommand.scala:50) > at kafka.admin.LogDirsCommand$.main(LogDirsCommand.scala:36) > at kafka.admin.LogDirsCommand.main(LogDirsCommand.scala) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: describeLogDirs > > When the brokerId entered by the user does not exist, an error message > indicating that the node is not present should be printed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
wenbingshen commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r593706745 ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -40,9 +40,16 @@ object LogDirsCommand { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) +val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray +case None => clusterBrokers +} + +val nonExistBrokers: Array[Int] = brokerList.filterNot(brokerId => clusterBrokers.contains(brokerId)) +if (!nonExistBrokers.isEmpty) { + System.err.println(s"The given node(s) does not exist from broker-list ${nonExistBrokers.mkString(",")}") + sys.exit(1) Review comment: > Should we do this only when handle the brokers provided by the user? It does not make sense to validate the list of brokers otherwise. What do you think? Your suggestion is very good.I have changed the logic, please review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
wenbingshen commented on pull request #10304: URL: https://github.com/apache/kafka/pull/10304#issuecomment-797884025 > Thanks for the PR. I left a minor suggestion. Could we also add a test case? Thanks for your comment.Your suggestion is very good, we only need to judge on the node entered by the user, I have added a unit test, please review it again.Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12426) Missing logic to create partition.metadata files in RaftReplicaManager
[ https://issues.apache.org/jira/browse/KAFKA-12426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300726#comment-17300726 ] Jason Gustafson commented on KAFKA-12426: - [~jolshan] Thanks, good find. Since it is not a regression, it's tough to call it a blocker. I'd suggest we target 2.8.1 for now, but if the patch is not too crazy, we might still be able to get it in depending on RC progress. > Missing logic to create partition.metadata files in RaftReplicaManager > -- > > Key: KAFKA-12426 > URL: https://issues.apache.org/jira/browse/KAFKA-12426 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Justine Olshan >Priority: Major > Labels: kip-500 > > As part of KIP-516, the broker should create a partition.metadata file for > each partition in order to keep track of the topicId. This is done through > `Partition.checkOrSetTopicId`. We have the logic for this in > `ReplicaManager.becomeLeaderOrFollower` already, but we need to implement > analogous logic in `RaftReplicaManager.handleMetadataRecords`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN
ableegoldman commented on pull request #10311: URL: https://github.com/apache/kafka/pull/10311#issuecomment-797862761 Merged to trunk and cherrypicked to 2.7 & 2.8 cc @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN
ableegoldman merged pull request #10311: URL: https://github.com/apache/kafka/pull/10311 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN
ableegoldman commented on pull request #10311: URL: https://github.com/apache/kafka/pull/10311#issuecomment-797862288 One unrelated test failure: `kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()` Going to merge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward
ableegoldman commented on a change in pull request #10300: URL: https://github.com/apache/kafka/pull/10300#discussion_r593553762 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java ## @@ -155,29 +155,20 @@ public void shouldReturnEmptyHeadersIfHeadersAreNotSet() { assertThat(context.headers(), is(emptyIterable())); } -@Test -public void shouldThrowIllegalStateExceptionOnHeadersIfNoRecordContext() { Review comment: Just want to make sure I understand the background here: as of KIP-478 we no longer throw if there's no active RecordContext, but instead just return empty. Is that right? If so then yes, an updated test like `shouldReturnEmptyHeaderIfRecordContextIsNull` seems good to have This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming closed pull request #10189: MINOR: Update copyright year in NOTICE
dengziming closed pull request #10189: URL: https://github.com/apache/kafka/pull/10189 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot
dengziming commented on a change in pull request #10289: URL: https://github.com/apache/kafka/pull/10289#discussion_r593541172 ## File path: clients/src/main/resources/common/message/VoteRequest.json ## @@ -21,7 +21,7 @@ "validVersions": "0", "flexibleVersions": "0+", "fields": [ -{ "name": "ClusterId", "type": "string", "versions": "0+", +{ "name": "ClusterId", "type": "string", "versions": "0+", "ignorable": true, Review comment: this was checked in by mistake when testing, I will revert it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #10312: MINOR: Fix log statement whose placeholders are inconsistent with arguments
dengziming opened a new pull request #10312: URL: https://github.com/apache/kafka/pull/10312 *More detailed description of your change* 1. When the 2nd argument is an exception we don't need a placeholder 2. Placeholders should equal to arguments. *Summary of testing strategy (including rationale)* QA ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
ableegoldman commented on pull request #10232: URL: https://github.com/apache/kafka/pull/10232#issuecomment-797844734 Failed with unrelated `connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining()` and `kafka.server.ScramServerStartupTest.testAuthentications()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward
mjsax commented on a change in pull request #10300: URL: https://github.com/apache/kafka/pull/10300#discussion_r593539158 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java ## @@ -37,7 +37,6 @@ public class ProcessorNode { -// TODO: 'children' can be removed when #forward() via index is removed Review comment: Cool. We can still change it later as it's only internal stuff if we feel the need. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward
mjsax commented on a change in pull request #10300: URL: https://github.com/apache/kafka/pull/10300#discussion_r593538979 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ## @@ -280,61 +280,6 @@ public void testDrivingSimpleTopology() { assertTrue(outputTopic1.isEmpty()); } - -@Test -public void testDrivingMultiplexingTopology() { Review comment: It's using the "old API" topology provided by `createMultiplexingTopology` (and below `createMultiplexByNameTopology`), ie, `MultiplexByNameProcessor` and `MultiplexingProcessor`. I did double check and the tested multiplexing case is tested using the new API via `createMultiProcessorTimestampTopology` that is using the `FanOutTimestampProcessor` that is the "new" equivalent of the old (and removed) `MultiplexByNameProcessor` processor (for the multiplex by index we don't need a new test as we removed routing by index on purpose and it's not part of the new API any longer.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward
mjsax commented on a change in pull request #10300: URL: https://github.com/apache/kafka/pull/10300#discussion_r593538020 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java ## @@ -155,29 +155,20 @@ public void shouldReturnEmptyHeadersIfHeadersAreNotSet() { assertThat(context.headers(), is(emptyIterable())); } -@Test -public void shouldThrowIllegalStateExceptionOnHeadersIfNoRecordContext() { Review comment: @ableegoldman Follow up though: should we keep this test using reverted logic: `shouldReturnEmptyHeaderIfRecordContextIsNull` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10189: MINOR: Update copyright year in NOTICE
dengziming commented on pull request #10189: URL: https://github.com/apache/kafka/pull/10189#issuecomment-797842701 close this since it has been resolved in #10308 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`
dengziming commented on pull request #10243: URL: https://github.com/apache/kafka/pull/10243#issuecomment-797842164 > Thank for the update. Have you tried to repeatedly run the test to verify that it resolve the issue? @dajac Of course, I tried many times to verify, only when I set the waitTimeMs<6000 it fails occasionally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`
dengziming commented on a change in pull request #10243: URL: https://github.com/apache/kafka/pull/10243#discussion_r593536795 ## File path: core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ## @@ -476,14 +476,18 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { // New instance of consumer should be assigned partitions immediately and should see committed offsets. val assignSemaphore = new Semaphore(0) val consumer = createConsumerWithGroupId(groupId) -consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener { +consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = { assignSemaphore.release() } def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = { }}) -consumer.poll(time.Duration.ofSeconds(3L)) -assertTrue(assignSemaphore.tryAcquire(1, TimeUnit.SECONDS), "Assignment did not complete on time") + +TestUtils.waitUntilTrue(() => { + consumer.poll(time.Duration.ZERO) Review comment: Done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #10310: KAFKA-12460; Do not allow raft truncation below high watermark
hachikuji merged pull request #10310: URL: https://github.com/apache/kafka/pull/10310 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12460) Raft should prevent truncation below high watermark
[ https://issues.apache.org/jira/browse/KAFKA-12460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12460. - Resolution: Fixed > Raft should prevent truncation below high watermark > --- > > Key: KAFKA-12460 > URL: https://issues.apache.org/jira/browse/KAFKA-12460 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Eventually we will have to come up with some approach to recover from > committed data loss in the raft quorum (something akin to unclean leader > election for normal partitions). For now, we would rather be stricter and > fail fast rather than allowing committed data to be silently lost. > Specifically, we can prevent any attempt to truncate below the high watermark > since this is a clear indication of data loss. The long term thought I have > in mind is to give users an --unsafe flag or something like that which can be > passed at startup in order to knowingly turn off the stricter validation in > order to let the quorum recover from a disaster scenario. This needs some > more thought though. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN
wcarlson5 commented on pull request #10311: URL: https://github.com/apache/kafka/pull/10311#issuecomment-797829955 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN
ableegoldman commented on a change in pull request #10311: URL: https://github.com/apache/kafka/pull/10311#discussion_r593523423 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java ## @@ -87,7 +87,9 @@ public void onPartitionsRevoked(final Collection partitions) { taskManager.activeTaskIds(), taskManager.standbyTaskIds()); -if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !partitions.isEmpty()) { +// We need to still invoke handleRevocation if the thread has been told to shut down, but we shouldn't ever +// transition away from PENDING_SHUTDOWN once it's been initiated (to anything other than DEAD) +if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) { Review comment: I think this is the correct order (assuming you mean the order of `streamThread.setState(State.PARTITIONS_REVOKED) != null` relative to `streamThread.state() == State.PENDING_SHUTDOWN`?) -- if the thread is not in PENDING_SHUTDOWN when it reaches this line, the first condition should return true, which is what we want even if it does get transitioned to PENDING_SHUTDOWN immediately after the transition to PARTITIONS_REVOKED. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN
ableegoldman commented on a change in pull request #10311: URL: https://github.com/apache/kafka/pull/10311#discussion_r593522324 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -714,6 +714,13 @@ void runOnce() { final long pollLatency = pollPhase(); +// Optimization to skip the rest of the processing loop in case the thread was requested to shut down during +// the poll phase Review comment: 臘♀️ Oh wow how did I not see that lol. I'll just bump the log to INFO This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
ableegoldman commented on a change in pull request #10232: URL: https://github.com/apache/kafka/pull/10232#discussion_r593521930 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { -requestRejoin(); Review comment: Hmm...but `resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception));` is only called in `joinGroupIfNeeded` which is only called in `ensureActiveGroup`, which is in turn only invoked in `ConsumerCoordinator#poll`. That said, inside `SyncGroupResponseHandler#handle` we would already have `rejoinNeeded = true` and only set it to false if the SyncGroup succeeds. So for that reason I guess we don't need the `requestRejoin` anywhere inside the SyncGroup handler This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN
wcarlson5 commented on a change in pull request #10311: URL: https://github.com/apache/kafka/pull/10311#discussion_r593517811 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java ## @@ -87,7 +87,9 @@ public void onPartitionsRevoked(final Collection partitions) { taskManager.activeTaskIds(), taskManager.standbyTaskIds()); -if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !partitions.isEmpty()) { +// We need to still invoke handleRevocation if the thread has been told to shut down, but we shouldn't ever +// transition away from PENDING_SHUTDOWN once it's been initiated (to anything other than DEAD) +if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) { Review comment: do we need to be concerned about the oder these execute? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -714,6 +714,13 @@ void runOnce() { final long pollLatency = pollPhase(); +// Optimization to skip the rest of the processing loop in case the thread was requested to shut down during +// the poll phase Review comment: Good idea but I think we do this a few lines down This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception
[ https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300654#comment-17300654 ] A. Sophie Blee-Goldman commented on KAFKA-12462: The only real downside in 2.7 is that we won't properly clean up the task, ie we'll skip committing the offsets and writing the checkpoint. So we'd lose any work we did since the last commit – for EOS this would be a perf hit since we'd probably need to restore the state stores from scratch after starting back up, whereas for ALOS we could get some overcounting. Not the end of the world, but worth fixing if we can > Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state > exception > -- > > Key: KAFKA-12462 > URL: https://issues.apache.org/jira/browse/KAFKA-12462 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Labels: streams > Fix For: 2.8.0, 2.7.1 > > > A thread was removed, sending it to the PENDING_SHUTDOWN state, but went > through a rebalance before completing the shutdown. > {code:java} > // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] > stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to > transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a > valid next state (org.apache.kafka.streams.processor.internals.StreamThread) > {code} > Inside StreamsRebalanceListener#onPartitionsRevoked, we have > {code:java} > // > if (streamThread.setState(State.PARTITIONS_REVOKED) != null && > !partitions.isEmpty()) > taskManager.handleRevocation(partitions); > {code} > Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we > never invoke TaskManager#handleRevocation. Currently handleRevocation is > responsible for preparing any active tasks for close, including committing > offsets and writing the checkpoint as well as suspending the task. We can’t > close the task in handleRevocation since we still support EAGER rebalancing, > which invokes handleRevocation at the beginning of a rebalance on all tasks. > The tasks that are actually revoked will be closed during > TaskManager#handleAssignment . The IllegalStateException is specifically > because we don’t suspend the task before attempting to close it, and the > direct transition from RUNNING → CLOSED is forbidden. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
guozhangwang commented on a change in pull request #10232: URL: https://github.com/apache/kafka/pull/10232#discussion_r593515777 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { -requestRejoin(); Review comment: I think we do not need to, since it would be called on `resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception));` --- previously we are calling rejoin double times. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN
ableegoldman commented on pull request #10311: URL: https://github.com/apache/kafka/pull/10311#issuecomment-797820964 @vvcephei @wcarlson5 @lct45 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10311: KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN
ableegoldman opened a new pull request #10311: URL: https://github.com/apache/kafka/pull/10311 Also check the state after the poll phase and exit the StreamThread processing loop early if the thread is in PENDING_SHUTDOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception
[ https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-12462: -- Assignee: A. Sophie Blee-Goldman > Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state > exception > -- > > Key: KAFKA-12462 > URL: https://issues.apache.org/jira/browse/KAFKA-12462 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Labels: streams > Fix For: 2.8.0 > > > A thread was removed, sending it to the PENDING_SHUTDOWN state, but went > through a rebalance before completing the shutdown. > {code:java} > // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] > stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to > transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a > valid next state (org.apache.kafka.streams.processor.internals.StreamThread) > {code} > Inside StreamsRebalanceListener#onPartitionsRevoked, we have > {code:java} > // > if (streamThread.setState(State.PARTITIONS_REVOKED) != null && > !partitions.isEmpty()) > taskManager.handleRevocation(partitions); > {code} > Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we > never invoke TaskManager#handleRevocation. Currently handleRevocation is > responsible for preparing any active tasks for close, including committing > offsets and writing the checkpoint as well as suspending the task. We can’t > close the task in handleRevocation since we still support EAGER rebalancing, > which invokes handleRevocation at the beginning of a rebalance on all tasks. > The tasks that are actually revoked will be closed during > TaskManager#handleAssignment . The IllegalStateException is specifically > because we don’t suspend the task before attempting to close it, and the > direct transition from RUNNING → CLOSED is forbidden. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception
[ https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson updated KAFKA-12462: --- Component/s: streams > Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state > exception > -- > > Key: KAFKA-12462 > URL: https://issues.apache.org/jira/browse/KAFKA-12462 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Priority: Blocker > Labels: streams > Fix For: 2.8.0 > > > A thread was removed, sending it to the PENDING_SHUTDOWN state, but went > through a rebalance before completing the shutdown. > {code:java} > // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] > stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to > transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a > valid next state (org.apache.kafka.streams.processor.internals.StreamThread) > {code} > Inside StreamsRebalanceListener#onPartitionsRevoked, we have > {code:java} > // > if (streamThread.setState(State.PARTITIONS_REVOKED) != null && > !partitions.isEmpty()) > taskManager.handleRevocation(partitions); > {code} > Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we > never invoke TaskManager#handleRevocation. Currently handleRevocation is > responsible for preparing any active tasks for close, including committing > offsets and writing the checkpoint as well as suspending the task. We can’t > close the task in handleRevocation since we still support EAGER rebalancing, > which invokes handleRevocation at the beginning of a rebalance on all tasks. > The tasks that are actually revoked will be closed during > TaskManager#handleAssignment . The IllegalStateException is specifically > because we don’t suspend the task before attempting to close it, and the > direct transition from RUNNING → CLOSED is forbidden. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception
[ https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson updated KAFKA-12462: --- Affects Version/s: 2.8.0 > Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state > exception > -- > > Key: KAFKA-12462 > URL: https://issues.apache.org/jira/browse/KAFKA-12462 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Priority: Blocker > Fix For: 2.8.0 > > > A thread was removed, sending it to the PENDING_SHUTDOWN state, but went > through a rebalance before completing the shutdown. > {code:java} > // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] > stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to > transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a > valid next state (org.apache.kafka.streams.processor.internals.StreamThread) > {code} > Inside StreamsRebalanceListener#onPartitionsRevoked, we have > {code:java} > // > if (streamThread.setState(State.PARTITIONS_REVOKED) != null && > !partitions.isEmpty()) > taskManager.handleRevocation(partitions); > {code} > Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we > never invoke TaskManager#handleRevocation. Currently handleRevocation is > responsible for preparing any active tasks for close, including committing > offsets and writing the checkpoint as well as suspending the task. We can’t > close the task in handleRevocation since we still support EAGER rebalancing, > which invokes handleRevocation at the beginning of a rebalance on all tasks. > The tasks that are actually revoked will be closed during > TaskManager#handleAssignment . The IllegalStateException is specifically > because we don’t suspend the task before attempting to close it, and the > direct transition from RUNNING → CLOSED is forbidden. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception
[ https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson updated KAFKA-12462: --- Labels: streams (was: ) > Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state > exception > -- > > Key: KAFKA-12462 > URL: https://issues.apache.org/jira/browse/KAFKA-12462 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Priority: Blocker > Labels: streams > Fix For: 2.8.0 > > > A thread was removed, sending it to the PENDING_SHUTDOWN state, but went > through a rebalance before completing the shutdown. > {code:java} > // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] > stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to > transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a > valid next state (org.apache.kafka.streams.processor.internals.StreamThread) > {code} > Inside StreamsRebalanceListener#onPartitionsRevoked, we have > {code:java} > // > if (streamThread.setState(State.PARTITIONS_REVOKED) != null && > !partitions.isEmpty()) > taskManager.handleRevocation(partitions); > {code} > Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we > never invoke TaskManager#handleRevocation. Currently handleRevocation is > responsible for preparing any active tasks for close, including committing > offsets and writing the checkpoint as well as suspending the task. We can’t > close the task in handleRevocation since we still support EAGER rebalancing, > which invokes handleRevocation at the beginning of a rebalance on all tasks. > The tasks that are actually revoked will be closed during > TaskManager#handleAssignment . The IllegalStateException is specifically > because we don’t suspend the task before attempting to close it, and the > direct transition from RUNNING → CLOSED is forbidden. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception
[ https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300652#comment-17300652 ] Walker Carlson commented on KAFKA-12462: If we were shutting down the whole client the thread would become dead either way. In 2.7 I think the only impact it would have is that the handler would get called after the close call when it shouldn’t. But otherwise it might not have an effect. I suppose there is no harm to back-porting though. I defiantly don't think it is worth cutting a new RC for anything that does not have removeThread in it > Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state > exception > -- > > Key: KAFKA-12462 > URL: https://issues.apache.org/jira/browse/KAFKA-12462 > Project: Kafka > Issue Type: Bug >Reporter: Walker Carlson >Priority: Blocker > Fix For: 2.8.0 > > > A thread was removed, sending it to the PENDING_SHUTDOWN state, but went > through a rebalance before completing the shutdown. > {code:java} > // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] > stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to > transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a > valid next state (org.apache.kafka.streams.processor.internals.StreamThread) > {code} > Inside StreamsRebalanceListener#onPartitionsRevoked, we have > {code:java} > // > if (streamThread.setState(State.PARTITIONS_REVOKED) != null && > !partitions.isEmpty()) > taskManager.handleRevocation(partitions); > {code} > Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we > never invoke TaskManager#handleRevocation. Currently handleRevocation is > responsible for preparing any active tasks for close, including committing > offsets and writing the checkpoint as well as suspending the task. We can’t > close the task in handleRevocation since we still support EAGER rebalancing, > which invokes handleRevocation at the beginning of a rebalance on all tasks. > The tasks that are actually revoked will be closed during > TaskManager#handleAssignment . The IllegalStateException is specifically > because we don’t suspend the task before attempting to close it, and the > direct transition from RUNNING → CLOSED is forbidden. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
ableegoldman commented on a change in pull request #10232: URL: https://github.com/apache/kafka/pull/10232#discussion_r593510070 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { -requestRejoin(); Review comment: Ok cool, thanks. One last question then: after this refactoring, since we no longer call `requestRejoinOnResponseError` below, should we re-add the `requestRejoin()` call here? Or add a `requestRejoin` to the specific cases in the SyncGroup handler, eg ``` } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " + "Sent generation was {}", sentGeneration); future.raise(error); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] levzem commented on pull request #10299: KAFKA-10070: parameterize Connect unit tests to remove code duplication
levzem commented on pull request #10299: URL: https://github.com/apache/kafka/pull/10299#issuecomment-797816232 tests failing are unrelated to the changes `Build / JDK 11 / kafka.api.PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup()` `Build / JDK 15 / org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining` @mimaison thanks for the review, mind taking another pass? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception
[ https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300650#comment-17300650 ] A. Sophie Blee-Goldman commented on KAFKA-12462: Thanks Walker! This actually seems like a long-lurking bug that was just surfaced by the removeStreamThread() feature, not caused by it. Before we could remove threads this was only possible when shutting down the client, which we don’t test as frequently as we now do removeStreamThread(). It’s also hard to notice that a bug has caused thread(s) to die when the threads were supposed to shut down anyways. But now we might only be removing one thread, and thanks to the new exception handler we’ll shut down the whole application upon hitting this so the thread won’t just quietly die. We should consider backporting the fix to 2.7, even though the bug isn't going to be as frequent or as bad in earlier versions. I wouldn't cut a new RC for 2.6.2 over this, but we might as well backport to get the fix in 2.7.1 whenever that comes out > Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state > exception > -- > > Key: KAFKA-12462 > URL: https://issues.apache.org/jira/browse/KAFKA-12462 > Project: Kafka > Issue Type: Bug >Reporter: Walker Carlson >Priority: Blocker > Fix For: 2.8.0 > > > A thread was removed, sending it to the PENDING_SHUTDOWN state, but went > through a rebalance before completing the shutdown. > {code:java} > // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] > stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to > transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a > valid next state (org.apache.kafka.streams.processor.internals.StreamThread) > {code} > Inside StreamsRebalanceListener#onPartitionsRevoked, we have > {code:java} > // > if (streamThread.setState(State.PARTITIONS_REVOKED) != null && > !partitions.isEmpty()) > taskManager.handleRevocation(partitions); > {code} > Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we > never invoke TaskManager#handleRevocation. Currently handleRevocation is > responsible for preparing any active tasks for close, including committing > offsets and writing the checkpoint as well as suspending the task. We can’t > close the task in handleRevocation since we still support EAGER rebalancing, > which invokes handleRevocation at the beginning of a rebalance on all tasks. > The tasks that are actually revoked will be closed during > TaskManager#handleAssignment . The IllegalStateException is specifically > because we don’t suspend the task before attempting to close it, and the > direct transition from RUNNING → CLOSED is forbidden. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r593509351 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -247,7 +267,7 @@ private boolean isVoter(int remoteNodeId) { return voterReplicaStates.containsKey(remoteNodeId); } -private static class ReplicaState implements Comparable { +private static abstract class ReplicaState implements Comparable { Review comment: Good point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception
[ https://issues.apache.org/jira/browse/KAFKA-12462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-12462: - Fix Version/s: 2.8.0 > Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state > exception > -- > > Key: KAFKA-12462 > URL: https://issues.apache.org/jira/browse/KAFKA-12462 > Project: Kafka > Issue Type: Bug >Reporter: Walker Carlson >Priority: Blocker > Fix For: 2.8.0 > > > A thread was removed, sending it to the PENDING_SHUTDOWN state, but went > through a rebalance before completing the shutdown. > {code:java} > // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] > stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to > transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a > valid next state (org.apache.kafka.streams.processor.internals.StreamThread) > {code} > Inside StreamsRebalanceListener#onPartitionsRevoked, we have > {code:java} > // > if (streamThread.setState(State.PARTITIONS_REVOKED) != null && > !partitions.isEmpty()) > taskManager.handleRevocation(partitions); > {code} > Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we > never invoke TaskManager#handleRevocation. Currently handleRevocation is > responsible for preparing any active tasks for close, including committing > offsets and writing the checkpoint as well as suspending the task. We can’t > close the task in handleRevocation since we still support EAGER rebalancing, > which invokes handleRevocation at the beginning of a rebalance on all tasks. > The tasks that are actually revoked will be closed during > TaskManager#handleAssignment . The IllegalStateException is specifically > because we don’t suspend the task before attempting to close it, and the > direct transition from RUNNING → CLOSED is forbidden. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12462) Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception
Walker Carlson created KAFKA-12462: -- Summary: Threads in PENDING_SHUTDOWN entering a rebalance can cause an illegal state exception Key: KAFKA-12462 URL: https://issues.apache.org/jira/browse/KAFKA-12462 Project: Kafka Issue Type: Bug Reporter: Walker Carlson A thread was removed, sending it to the PENDING_SHUTDOWN state, but went through a rebalance before completing the shutdown. {code:java} // [2021-03-07 04:33:39,385] DEBUG [i-07430efc31ad166b7-StreamThread-6] stream-thread [i-07430efc31ad166b7-StreamThread-6] Ignoring request to transit from PENDING_SHUTDOWN to PARTITIONS_REVOKED: only DEAD state is a valid next state (org.apache.kafka.streams.processor.internals.StreamThread) {code} Inside StreamsRebalanceListener#onPartitionsRevoked, we have {code:java} // if (streamThread.setState(State.PARTITIONS_REVOKED) != null && !partitions.isEmpty()) taskManager.handleRevocation(partitions); {code} Since PENDING_SHUTDOWN → PARTITIONS_REVOKED is a disallowed transition, we never invoke TaskManager#handleRevocation. Currently handleRevocation is responsible for preparing any active tasks for close, including committing offsets and writing the checkpoint as well as suspending the task. We can’t close the task in handleRevocation since we still support EAGER rebalancing, which invokes handleRevocation at the beginning of a rebalance on all tasks. The tasks that are actually revoked will be closed during TaskManager#handleAssignment . The IllegalStateException is specifically because we don’t suspend the task before attempting to close it, and the direct transition from RUNNING → CLOSED is forbidden. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`
dajac commented on a change in pull request #10243: URL: https://github.com/apache/kafka/pull/10243#discussion_r593505670 ## File path: core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ## @@ -476,14 +476,18 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { // New instance of consumer should be assigned partitions immediately and should see committed offsets. val assignSemaphore = new Semaphore(0) val consumer = createConsumerWithGroupId(groupId) -consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener { +consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = { assignSemaphore.release() } def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = { }}) -consumer.poll(time.Duration.ofSeconds(3L)) -assertTrue(assignSemaphore.tryAcquire(1, TimeUnit.SECONDS), "Assignment did not complete on time") + +TestUtils.waitUntilTrue(() => { + consumer.poll(time.Duration.ZERO) Review comment: Should we keep a small poll timeout here? Something like 100ms? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12460) Raft should prevent truncation below high watermark
[ https://issues.apache.org/jira/browse/KAFKA-12460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12460: -- Description: Eventually we will have to come up with some approach to recover from committed data loss in the raft quorum (something akin to unclean leader election for normal partitions). For now, we would rather be stricter and fail fast rather than allowing committed data to be silently lost. Specifically, we can prevent any attempt to truncate below the high watermark since this is a clear indication of data loss. The long term thought I have in mind is to give users an --unsafe flag or something like that which can be passed at startup in order to knowingly turn off the stricter validation in order to let the quorum recover from a disaster scenario. This needs some more thought though. (was: Eventually we will have to come up with some approach to recover from committed data loss in the raft quorum (something akin to unclean leader election for normal partitions). For now, we would rather be stricter and fail fast rather than allowing committed data to be silently lost. Specifically, we can prevent any attempt to truncate below the high watermark since this is a clear indication of data loss. The long term thought I have in mind is to give users an --unsafe flag or something like that which can be passed at startup in order to knowingly turn of the stricter validation in order to let the quorum recover from a disaster scenario. This needs some more thought though.) > Raft should prevent truncation below high watermark > --- > > Key: KAFKA-12460 > URL: https://issues.apache.org/jira/browse/KAFKA-12460 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Eventually we will have to come up with some approach to recover from > committed data loss in the raft quorum (something akin to unclean leader > election for normal partitions). For now, we would rather be stricter and > fail fast rather than allowing committed data to be silently lost. > Specifically, we can prevent any attempt to truncate below the high watermark > since this is a clear indication of data loss. The long term thought I have > in mind is to give users an --unsafe flag or something like that which can be > passed at startup in order to knowingly turn off the stricter validation in > order to let the quorum recover from a disaster scenario. This needs some > more thought though. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
guozhangwang commented on a change in pull request #10232: URL: https://github.com/apache/kafka/pull/10232#discussion_r593504141 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { -requestRejoin(); Review comment: I added that function for sync group handler that handles retriable `COORDINATOR_NOT_AVAILABLE / NOT_COORDINATOR` and any unexpected error. After the refactoring PR they are not all fall into the `joinGroupIfNeeded` in ``` final RuntimeException exception = future.exception(); resetJoinGroupFuture(); if (exception instanceof UnknownMemberIdException || exception instanceof IllegalGenerationException || exception instanceof RebalanceInProgressException || exception instanceof MemberIdRequiredException) continue; else if (!future.isRetriable()) throw exception; resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); timer.sleep(rebalanceConfig.retryBackoffMs); ``` This is part of the principle I mentioned: ``` We may reset generationa and request rejoin in two different places: 1) in join/sync-group handler, and 2) in joinGroupIfNeeded, when the future is received. The principle is that these two should not overlap, and 2) is used as a fallback for those common errors from join/sync that we do not handle specifically. ``` But I forgot to remove this function as part of the second pass; will remove. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
dajac commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r593503408 ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -40,9 +40,16 @@ object LogDirsCommand { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) +val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray +case None => clusterBrokers +} + +val nonExistBrokers: Array[Int] = brokerList.filterNot(brokerId => clusterBrokers.contains(brokerId)) +if (!nonExistBrokers.isEmpty) { + System.err.println(s"The given node(s) does not exist from broker-list ${nonExistBrokers.mkString(",")}") + sys.exit(1) Review comment: Should we do this only when handle the brokers provided by the user? It does not make sense to validate the list of brokers otherwise. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
jsancio commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r593492361 ## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ## @@ -247,7 +267,7 @@ private boolean isVoter(int remoteNodeId) { return voterReplicaStates.containsKey(remoteNodeId); } -private static class ReplicaState implements Comparable { +private static abstract class ReplicaState implements Comparable { Review comment: Do we really need to distinguish between `VoterState` and `ObserverState`? For example, the only different is `hasAcknowledgedLeader`. I would argue that we could just move this field to `ReplicateState` and say that observers will have this value always false or the value is ignored. I am leaning towards just updating the value irrespective of if it is a voter or observer. This is probably useful to have it when we implement quorum reassignment. We can document whatever semantic you decide as a comment for this type. ## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ## @@ -140,6 +160,22 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } +@Test +public void testNonMonotonicHighWatermarkUpdate() { +MockTime time = new MockTime(); +int node1 = 1; +LeaderState state = newLeaderState(mkSet(localId, node1), 0L); +state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(10L)); +state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L)); +assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); + +// Follower crashes and disk is lost. It fetches an earlier offset to rebuild state. +state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(5L)); Review comment: Let's check that this calls returns `false`. Let's also add a test that calls `getVoterEndOffsets` and checks the returned map is correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12461) Extend LogManager to cover the metadata topic
Jason Gustafson created KAFKA-12461: --- Summary: Extend LogManager to cover the metadata topic Key: KAFKA-12461 URL: https://issues.apache.org/jira/browse/KAFKA-12461 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson The `@metadata` topic is not managed by `LogManager` since it uses a new snapshot-based retention policy. This means that it is not covered by the recovery and high watermark checkpoints. It would be useful to fix this. We can either extend `LogManager` so that it is aware of the snapshotting semantics implemented by the `@metadata` topic, or we can create something like a `RaftLogManager`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager
abbccdda commented on a change in pull request #10135: URL: https://github.com/apache/kafka/pull/10135#discussion_r593492752 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -179,17 +181,16 @@ class BrokerServer( val controllerNodes = RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes) - val forwardingChannelManager = BrokerToControllerChannelManager( + clientToControllerChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, time, metrics, config, -channelName = "forwarding", +channelName = "clientToControllerChannel", Review comment: Sg! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #10310: KAFKA-12460; Do not allow raft truncation below high watermark
hachikuji opened a new pull request #10310: URL: https://github.com/apache/kafka/pull/10310 Initially we want to be strict about the loss of committed data for the `@metadata` topic. This patch ensures that truncation below the high watermark is not allowed. Note that `MockLog` already had the logic to do so, so the patch adds a similar check to `KafkaMetadataLog`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager
hachikuji commented on a change in pull request #10135: URL: https://github.com/apache/kafka/pull/10135#discussion_r593485718 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -179,17 +181,16 @@ class BrokerServer( val controllerNodes = RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes) - val forwardingChannelManager = BrokerToControllerChannelManager( + clientToControllerChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, time, metrics, config, -channelName = "forwarding", +channelName = "clientToControllerChannel", Review comment: How about "controllerForwardingChannel"? I think it fits for both cases we're handling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12460) Raft should prevent truncation below high watermark
[ https://issues.apache.org/jira/browse/KAFKA-12460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12460: Description: Eventually we will have to come up with some approach to recover from committed data loss in the raft quorum (something akin to unclean leader election for normal partitions). For now, we would rather be stricter and fail fast rather than allowing committed data to be silently lost. Specifically, we can prevent any attempt to truncate below the high watermark since this is a clear indication of data loss. The long term thought I have in mind is to give users an --unsafe flag or something like that which can be passed at startup in order to knowingly turn of the stricter validation in order to let the quorum recover from a disaster scenario. This needs some more thought though. (was: Eventually we will have to come up with some approach to recover from committed data loss in the raft quorum (something akin to unclean leader election for normal partitions). For now, we would rather be stricter and fail fast rather than allowing committed data to be silently lost. The long term thought I have in mind is to give users an --unsafe flag or something like that which can be passed at startup in order to knowingly turn of the stricter validation in order to let the quorum recover from a disaster scenario. This needs some more thought though.) > Raft should prevent truncation below high watermark > --- > > Key: KAFKA-12460 > URL: https://issues.apache.org/jira/browse/KAFKA-12460 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Eventually we will have to come up with some approach to recover from > committed data loss in the raft quorum (something akin to unclean leader > election for normal partitions). For now, we would rather be stricter and > fail fast rather than allowing committed data to be silently lost. > Specifically, we can prevent any attempt to truncate below the high watermark > since this is a clear indication of data loss. The long term thought I have > in mind is to give users an --unsafe flag or something like that which can be > passed at startup in order to knowingly turn of the stricter validation in > order to let the quorum recover from a disaster scenario. This needs some > more thought though. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12460) Raft should not prevent truncation below high watermark
Jason Gustafson created KAFKA-12460: --- Summary: Raft should not prevent truncation below high watermark Key: KAFKA-12460 URL: https://issues.apache.org/jira/browse/KAFKA-12460 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Eventually we will have to come up with some approach to recover from committed data loss in the raft quorum (something akin to unclean leader election for normal partitions). For now, we would rather be stricter and fail fast rather than allowing committed data to be silently lost. The long term thought I have in mind is to give users an --unsafe flag or something like that which can be passed at startup in order to knowingly turn of the stricter validation in order to let the quorum recover from a disaster scenario. This needs some more thought though. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12460) Raft should prevent truncation below high watermark
[ https://issues.apache.org/jira/browse/KAFKA-12460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12460: Summary: Raft should prevent truncation below high watermark (was: Raft should not prevent truncation below high watermark) > Raft should prevent truncation below high watermark > --- > > Key: KAFKA-12460 > URL: https://issues.apache.org/jira/browse/KAFKA-12460 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Eventually we will have to come up with some approach to recover from > committed data loss in the raft quorum (something akin to unclean leader > election for normal partitions). For now, we would rather be stricter and > fail fast rather than allowing committed data to be silently lost. The long > term thought I have in mind is to give users an --unsafe flag or something > like that which can be passed at startup in order to knowingly turn of the > stricter validation in order to let the quorum recover from a disaster > scenario. This needs some more thought though. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji opened a new pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji opened a new pull request #10309: URL: https://github.com/apache/kafka/pull/10309 Currently the Raft leader raises an exception if there is a non-monotonic update to the fetch offset of a replica. In a situation where the replica had lost it disk state, this would prevent the replica from being able to recover. In this patch, we relax the validation to address this problem. It is worth pointing out that this validation could not be relied on to protect from data loss after a voter has lost committed state. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward
ableegoldman commented on a change in pull request #10300: URL: https://github.com/apache/kafka/pull/10300#discussion_r593468505 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java ## @@ -37,7 +37,6 @@ public class ProcessorNode { -// TODO: 'children' can be removed when #forward() via index is removed Review comment: Fine with me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10300: KAFKA-12452: Remove deprecated overloads of ProcessorContext#forward
ableegoldman commented on a change in pull request #10300: URL: https://github.com/apache/kafka/pull/10300#discussion_r593468044 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ## @@ -280,61 +280,6 @@ public void testDrivingSimpleTopology() { assertTrue(outputTopic1.isEmpty()); } - -@Test -public void testDrivingMultiplexingTopology() { Review comment: Just wondering, why do we remove this test (and the below)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
ableegoldman commented on a change in pull request #10232: URL: https://github.com/apache/kafka/pull/10232#discussion_r593463366 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { -requestRejoin(); Review comment: @guozhangwang I think something may have been messed up during a merge/rebase: I no longer see `requestRejoinOnResponseError` being invoked anywhere This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12426) Missing logic to create partition.metadata files in RaftReplicaManager
[ https://issues.apache.org/jira/browse/KAFKA-12426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300616#comment-17300616 ] Justine Olshan edited comment on KAFKA-12426 at 3/12/21, 9:26 PM: -- While working on this bug, I caught another small bug in ReplicaManager due to changing the position of the topic ID check (to prevent handling a request for an inconsistent ID). I realized that on the first LISR received on a newly created topic, the log will not yet be created when `checkOrSetTopicId` is called. This means the log will not store the topic ID until the second LISR request. This bug means that we lose the benefits of topic IDs on this first LISR pass, but we don't actively break things more than LISR pre-topic IDs. A similar event will occur when restarting brokers. Logs will not yet be associated with the Partition object, so we won't be able to check the topic ID on the first LISR pass. This means the partition.metadata file is unable to serve its intended purpose in this case, so I think this case is a little worse. Both cases are not worse than the pre-topic ID behavior we had before. There is a pretty simple fix to this which I think I can include in this PR [~ijuma] [~vvcephei] do you think this is a blocker for 2.8? was (Author: jolshan): While working on this bug, I caught another small bug in ReplicaManager due to changing the position of the topic ID check (to prevent handling a request for an inconsistent ID). I realized that on the first LISR received on a newly created topic, the log will not yet be created when `checkOrSetTopicId` is called. This means the log will not store the topic ID until the second LISR request. This bug means that we lose the benefits of topic IDs on this first LISR pass, but we don't actively break things more than LISR pre-topic IDs. A similar event will occur when restarting brokers. Logs will not yet be associated with the Partition object, so we won't be able to check the topic ID on the first LISR pass. This means the partition.metadata file is unable to serve its intended purpose in this case, so I think this case is a little worse. There is a pretty simple fix to this which I think I can include in this PR [~ijuma] [~vvcephei] do you think this is a blocker for 2.8? > Missing logic to create partition.metadata files in RaftReplicaManager > -- > > Key: KAFKA-12426 > URL: https://issues.apache.org/jira/browse/KAFKA-12426 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Justine Olshan >Priority: Major > Labels: kip-500 > > As part of KIP-516, the broker should create a partition.metadata file for > each partition in order to keep track of the topicId. This is done through > `Partition.checkOrSetTopicId`. We have the logic for this in > `ReplicaManager.becomeLeaderOrFollower` already, but we need to implement > analogous logic in `RaftReplicaManager.handleMetadataRecords`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12426) Missing logic to create partition.metadata files in RaftReplicaManager
[ https://issues.apache.org/jira/browse/KAFKA-12426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300616#comment-17300616 ] Justine Olshan commented on KAFKA-12426: While working on this bug, I caught another small bug in ReplicaManager due to changing the position of the topic ID check (to prevent handling a request for an inconsistent ID). I realized that on the first LISR received on a newly created topic, the log will not yet be created when `checkOrSetTopicId` is called. This means the log will not store the topic ID until the second LISR request. This bug means that we lose the benefits of topic IDs on this first LISR pass, but we don't actively break things more than LISR pre-topic IDs. A similar event will occur when restarting brokers. Logs will not yet be associated with the Partition object, so we won't be able to check the topic ID on the first LISR pass. This means the partition.metadata file is unable to serve its intended purpose in this case, so I think this case is a little worse. There is a pretty simple fix to this which I think I can include in this PR [~ijuma] [~vvcephei] do you think this is a blocker for 2.8? > Missing logic to create partition.metadata files in RaftReplicaManager > -- > > Key: KAFKA-12426 > URL: https://issues.apache.org/jira/browse/KAFKA-12426 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Justine Olshan >Priority: Major > Labels: kip-500 > > As part of KIP-516, the broker should create a partition.metadata file for > each partition in order to keep track of the topicId. This is done through > `Partition.checkOrSetTopicId`. We have the logic for this in > `ReplicaManager.becomeLeaderOrFollower` already, but we need to implement > analogous logic in `RaftReplicaManager.handleMetadataRecords`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma merged pull request #10307: MINOR: Log project, gradle, java and scala versions at the start of the build
ijuma merged pull request #10307: URL: https://github.com/apache/kafka/pull/10307 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write
jsancio commented on a change in pull request #10278: URL: https://github.com/apache/kafka/pull/10278#discussion_r593419637 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1876,7 +1876,7 @@ private long maybeAppendBatches( BatchAccumulator.CompletedBatch batch = iterator.next(); appendBatch(state, batch, currentTimeMs); } -flushLeaderLog(state, currentTimeMs); +//flushLeaderLog(state, currentTimeMs); Review comment: The invariant that the leader most satisfy is that the `highWatermark <= flushOffset`. The current implementation satisfies this by flushing after every append and implicitly defining `flushOffset == logEndOffset`. At a high-level, I think the goals is to allow `highWatermark <= flushOffset <= logEndOffset`. On the follower, things are a little different. On the follower the `flushOffset == logEndOffset` before a `Fetch` request can be sent. This is because the leader assumes that the fetch offset in the `Fetch` request is the offset that the follower has successfully replicated. The advantage of appending without flushing as soon as possible replication latency. The leader cannot replicate record batches to the followers and observers until they have been appended to the log. I am not exactly sure how exactly we want to implement this since I haven't looked at the details but I think you are correct that on the leader side of things we want to increase the `flushOffset` in the `Fetch` request handling code as the leader attempts to increase the high-watermark. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r593362414 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition, } /** - * Checks if the topic ID provided in the request is consistent with the topic ID in the log. + * Checks if the topic ID received is consistent with the topic ID in the log. * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID. * - * @param requestTopicId the topic ID from the request - * @return true if the request topic id is consistent, false otherwise + * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records + * @return true if the received topic id is consistent, false otherwise */ - def checkOrSetTopicId(requestTopicId: Uuid): Boolean = { -// If the request had an invalid topic ID, then we assume that topic IDs are not supported. -// The topic ID was not inconsistent, so return true. -// If the log is empty, then we can not say that topic ID is inconsistent, so return true. -if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) - true -else { + def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = { Review comment: Hmmm I'm not a huge fan of this either since it also seems to be waiting to set the ID and it adds complexity to recovery. I'll take another look. Maybe I'm missing something. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition, } /** - * Checks if the topic ID provided in the request is consistent with the topic ID in the log. + * Checks if the topic ID received is consistent with the topic ID in the log. * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID. * - * @param requestTopicId the topic ID from the request - * @return true if the request topic id is consistent, false otherwise + * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records + * @return true if the received topic id is consistent, false otherwise */ - def checkOrSetTopicId(requestTopicId: Uuid): Boolean = { -// If the request had an invalid topic ID, then we assume that topic IDs are not supported. -// The topic ID was not inconsistent, so return true. -// If the log is empty, then we can not say that topic ID is inconsistent, so return true. -if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) - true -else { + def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = { Review comment: Hmm this is a little tricky. One idea I had would be to check or set the ID in the partition object only (not the file or in Log memory) then when the the partitions are passed in to `makeLeaders/Followers` and a new log is created, we will have the topic ID easily accessible to create the partition.metadata file. It kind of changes how we do things. We would either store in partition **_instead of_** or **_in addition to_** Log. Not sure if we also want to change this for the ZK code. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition, } /** - * Checks if the topic ID provided in the request is consistent with the topic ID in the log. + * Checks if the topic ID received is consistent with the topic ID in the log. * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID. * - * @param requestTopicId the topic ID from the request - * @return true if the request topic id is consistent, false otherwise + * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records + * @return true if the received topic id is consistent, false otherwise */ - def checkOrSetTopicId(requestTopicId: Uuid): Boolean = { -// If the request had an invalid topic ID, then we assume that topic IDs are not supported. -// The topic ID was not inconsistent, so return true. -// If the log is empty, then we can not say that topic ID is inconsistent, so return true. -if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) - true -else { + def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = { Review comment: Hmm this is a little tricky. One idea I had would be to set the ID in the partition object only (not the file or in Log memory) then when the the partitions are passed in to `makeLeaders/Followers` and a new log is created, we will have the topic ID easily accessible to create the partition.metadata file. It kind of changes how we do things. We would either store in partition **_instead of_** or **_in addition to_** Log. Not sure if we also want to change this for the ZK code. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition, } /** - * Checks if the topic ID provided in the request is consistent with the topic ID in the log. + * Checks if the topic ID received is consistent with the topic ID in the log. * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID. * - * @param requestTopicId the topic ID from the request - * @return true if the request topic id is consistent, false otherwise + * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records + * @return true if the received topic id is consistent, false otherwise */ - def checkOrSetTopicId(requestTopicId: Uuid): Boolean = { -// If the request had an invalid topic ID, then we assume that topic IDs are not supported. -// The topic ID was not inconsistent, so return true. -// If the log is empty, then we can not say that topic ID is inconsistent, so return true. -if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) - true -else { + def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = { Review comment: Hmm this is a little tricky. One idea I had would be to check or set the ID in the partition object only (not the file or in Log memory) then when the the partitions are passed in to `makeLeaders/Followers` and a new log is created, we will have the topic ID easily accessible to create the partition.metadata file. It kind of changes how we do things. We would either store in partition **_instead of_** or **_in addition to_** Log. Not sure if we also want to change this for the ZK code. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition, } /** - * Checks if the topic ID provided in the request is consistent with the topic ID in the log. + * Checks if the topic ID received is consistent with the topic ID in the log. * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID. * - * @param requestTopicId the topic ID from the request - * @return true if the request topic id is consistent, false otherwise + * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records + * @return true if the received topic id is consistent, false otherwise */ - def checkOrSetTopicId(requestTopicId: Uuid): Boolean = { -// If the request had an invalid topic ID, then we assume that topic IDs are not supported. -// The topic ID was not inconsistent, so return true. -// If the log is empty, then we can not say that topic ID is inconsistent, so return true. -if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) - true -else { + def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = { Review comment: Hmm this is a little tricky. One idea I had would be to check or set the ID in the partition object only (not the file) then when the the partitions are passed in to `makeLeaders/Followers` and a new log is created, we will have the topic ID easily accessible to create the partition.metadata file. It kind of changes how we do things. We would either store in partition **_instead of_** or **_in addition to_** Log. Not sure if we also want to change this for the ZK code. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition, } /** - * Checks if the topic ID provided in the request is consistent with the topic ID in the log. + * Checks if the topic ID received is consistent with the topic ID in the log. * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID. * - * @param requestTopicId the topic ID from the request - * @return true if the request topic id is consistent, false otherwise + * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records + * @return true if the received topic id is consistent, false otherwise */ - def checkOrSetTopicId(requestTopicId: Uuid): Boolean = { -// If the request had an invalid topic ID, then we assume that topic IDs are not supported. -// The topic ID was not inconsistent, so return true. -// If the log is empty, then we can not say that topic ID is inconsistent, so return true. -if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) - true -else { + def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = { Review comment: Hmm this is a little tricky. One idea I had would be to check or set the ID in the partition object only (not the file) then when the the partitions are passed in to `makeLeaders/Followers` and a new log is created, we will have the topic ID easily accessible to create the partition.metadata file. It kind of changes how we do things. We would either store in partition _instead of_ or _in addition to_ Log. Not sure if we also want to change this for the ZK code. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r593330038 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition, } /** - * Checks if the topic ID provided in the request is consistent with the topic ID in the log. + * Checks if the topic ID received is consistent with the topic ID in the log. * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID. * - * @param requestTopicId the topic ID from the request - * @return true if the request topic id is consistent, false otherwise + * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records + * @return true if the received topic id is consistent, false otherwise */ - def checkOrSetTopicId(requestTopicId: Uuid): Boolean = { -// If the request had an invalid topic ID, then we assume that topic IDs are not supported. -// The topic ID was not inconsistent, so return true. -// If the log is empty, then we can not say that topic ID is inconsistent, so return true. -if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) - true -else { + def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = { Review comment: Hmm. Looking into this further, it seems that the log is created in RaftReplicaManager when making a leader or a follower. This occurs in the `handleMetadataRecords` method (non-deferring case) and the `endMetadataChangeDeferral` method. Each requires a few methods to pass through before we get to `LogManager.getOrCreateLog`, which is not ideal to pass a topic ID through (especially when some of this code is shared with the ZK path, but not impossible) One thing I am wondering though is how I tested the changes. I think I assume the log is created already, but this code suggests that the log will not be created until `makeLeaders/Followers` in `handleMetadataRecords` in the non deferring case, and not created until `endMetadataChangeDeferral` in the deferring case. This means that where I set topic IDs now, we won't have a log on the first pass! So I think maybe it is worth it to pass the ID through these methods. I think I can keep the check of the topic ID where it is though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r593330038 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition, } /** - * Checks if the topic ID provided in the request is consistent with the topic ID in the log. + * Checks if the topic ID received is consistent with the topic ID in the log. * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID. * - * @param requestTopicId the topic ID from the request - * @return true if the request topic id is consistent, false otherwise + * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records + * @return true if the received topic id is consistent, false otherwise */ - def checkOrSetTopicId(requestTopicId: Uuid): Boolean = { -// If the request had an invalid topic ID, then we assume that topic IDs are not supported. -// The topic ID was not inconsistent, so return true. -// If the log is empty, then we can not say that topic ID is inconsistent, so return true. -if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) - true -else { + def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = { Review comment: Hmm. Looking into this further, it seems that the log is created in RaftReplicaManager when making a leader or a follower. This occurs in the `handleMetadataRecords` method (non-deferring case) and the `endMetadataChangeDeferral` method. Each requires a few methods to pass through before we get to `LogManager.getOrCreateLog`, which is not ideal to pass a topic ID through (especially when some of this code is shared with the ZK path, but not impossible) One thing I am wondering though is how I tested the changes. I think I assume the log is created already, but this code suggests that the log will not be created until `makeLeaders/Followers` in the non deferring case, and not created until `endMetadataChangeDeferral` in the deferring case. This means that where I set topic IDs now, we won't have a log on the first pass! So I think maybe it is worth it to pass the ID through these methods. I think I can keep the check of the topic ID where it is though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r593330038 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition, } /** - * Checks if the topic ID provided in the request is consistent with the topic ID in the log. + * Checks if the topic ID received is consistent with the topic ID in the log. * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID. * - * @param requestTopicId the topic ID from the request - * @return true if the request topic id is consistent, false otherwise + * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records + * @return true if the received topic id is consistent, false otherwise */ - def checkOrSetTopicId(requestTopicId: Uuid): Boolean = { -// If the request had an invalid topic ID, then we assume that topic IDs are not supported. -// The topic ID was not inconsistent, so return true. -// If the log is empty, then we can not say that topic ID is inconsistent, so return true. -if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) - true -else { + def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = { Review comment: Hmm. Looking into this further, it seems that the log is created in RaftReplicaManager when making a leader or a follower. This occurs in the `handleMetadataRecords` method and the `endMetadataChangeDeferral` method. Each requires a few methods to pass through before we get to `LogManager.getOrCreateLog`, which is not ideal to pass a topic ID through (especially when some of this code is shared with the ZK path, but not impossible) One thing I am wondering though is how I tested the changes. I think I assume the log is created already, but this code suggests that the log will not be created until `makeLeaders/Followers` in the non deferring case, and not created until `endMetadataChangeDeferral` in the deferring case. This means that where I set topic IDs now, we won't have a log on the first pass! So I think maybe it is worth it to pass the ID through these methods. I think I can keep the check of the topic ID where it is though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #10305: KAFKA-10357: Add missing repartition topic validation
guozhangwang merged pull request #10305: URL: https://github.com/apache/kafka/pull/10305 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #10308: MINOR: Update year in NOTICE
mimaison commented on pull request #10308: URL: https://github.com/apache/kafka/pull/10308#issuecomment-797619654 cc @ijuma @mjsax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request #10308: MINOR: Update year in NOTICE
mimaison opened a new pull request #10308: URL: https://github.com/apache/kafka/pull/10308 This is required to run a release as it is checked in release.py This needs to be applied to 2.8, 2.7 and 2.6 too ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] askldjd commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration
askldjd commented on pull request #8921: URL: https://github.com/apache/kafka/pull/8921#issuecomment-797598156 > @askldjd Good catch, you're right it looks like this was only merged into 2.4. I've reopened [KAFKA-10160](https://issues.apache.org/jira/browse/KAFKA-10160). > Let's see if @satishbellapu has time to open a PR in the next few days. Otherwise I'll port this change next week Thanks @mimaison. Really appreciate it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r593293236 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -80,15 +156,34 @@ public Errors error() { return Errors.forCode(data.errorCode()); } -public LinkedHashMap responseData() { +public LinkedHashMap responseData(Map topicNames, short version) { +if (version < 13) +return toResponseDataMap(); +return toResponseDataMap(topicNames); + +} + +// TODO: Should be replaced or cleaned up. The idea is that in KafkaApis we need to reconstruct responseData even though we could have just passed in and out a map. +// With topic IDs, recreating the map takes a little more time since we have to get the topic name from the topic ID to name map. +// The refactor somewhat helps in KafkaApis, but we have to recompute the map instead of just returning it. +// This is unsafe in test cases now (FetchSessionTest) where it used to be safe. Before it would just pull the responseData map. +// If we wanted to recompute with topicNames we could call responseData(topicNames) however, +// now it will just return the version computed here. + +// Used when we can guarantee responseData is populated with all possible partitions +// This occurs when we have a response version < 13 or we built the FetchResponse with +// responseDataMap as a parameter and we have the same topic IDs available. +public LinkedHashMap resolvedResponseData() { Review comment: I suppose we could set them server side, but when we iterate though the topics, we would need to not include those whose topic IDs could not be resolved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r593283525 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -80,15 +156,34 @@ public Errors error() { return Errors.forCode(data.errorCode()); } -public LinkedHashMap responseData() { +public LinkedHashMap responseData(Map topicNames, short version) { +if (version < 13) +return toResponseDataMap(); +return toResponseDataMap(topicNames); + +} + +// TODO: Should be replaced or cleaned up. The idea is that in KafkaApis we need to reconstruct responseData even though we could have just passed in and out a map. +// With topic IDs, recreating the map takes a little more time since we have to get the topic name from the topic ID to name map. +// The refactor somewhat helps in KafkaApis, but we have to recompute the map instead of just returning it. +// This is unsafe in test cases now (FetchSessionTest) where it used to be safe. Before it would just pull the responseData map. +// If we wanted to recompute with topicNames we could call responseData(topicNames) however, +// now it will just return the version computed here. + +// Used when we can guarantee responseData is populated with all possible partitions +// This occurs when we have a response version < 13 or we built the FetchResponse with +// responseDataMap as a parameter and we have the same topic IDs available. +public LinkedHashMap resolvedResponseData() { Review comment: Got it. I think the issue here then is that some of the information can not be auto-generated data. We need topic names for certain methods but names will not be in the auto-generated data. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10285: KAFKA-12442: Upgrade ZSTD JNI from 1.4.8-4 to 1.4.9-1
ijuma commented on pull request #10285: URL: https://github.com/apache/kafka/pull/10285#issuecomment-797576186 @chia7712 worth cherry-picking to `2.8` if the aim is to align the versions with other projects that are releasing soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10203: KAFKA-12415: Prepare for Gradle 7.0 and restrict transitive scope for non api dependencies
ijuma commented on a change in pull request #10203: URL: https://github.com/apache/kafka/pull/10203#discussion_r593271376 ## File path: README.md ## @@ -69,10 +69,6 @@ Generate coverage for a single module, i.e.: ### Building a binary release gzipped tar ball ### ./gradlew clean releaseTarGz -The above command will fail if you haven't set up the signing key. To bypass signing the artifact, you can run: - -./gradlew clean releaseTarGz -x signArchives Review comment: We figured it out, the build was using a non snapshot version. Passing `-PskipSigning=true` fixed it. Submitted https://github.com/apache/kafka/pull/10307 to make it easier to debug these issues in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #10307: MINOR: Log project, gradle, java and scala versions at the start of the build
ijuma opened a new pull request #10307: URL: https://github.com/apache/kafka/pull/10307 This is useful when debugging build issues. I also removed two printlns that are now redundant, so this makes the build more informative and less noisy at the same time. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7525) Handling corrupt records
[ https://issues.apache.org/jira/browse/KAFKA-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300379#comment-17300379 ] Alexandre Dupriez commented on KAFKA-7525: -- It was fixed in 2.6.0: KAFKA-9206 > Handling corrupt records > > > Key: KAFKA-7525 > URL: https://issues.apache.org/jira/browse/KAFKA-7525 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 1.1.0 >Reporter: Katarzyna Solnica >Priority: Major > > When Java consumer encounters a corrupt record on a partition it reads from, > it throws: > {code:java} > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from XYZ. If needed, please seek past the record to continue > consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1125) > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > (...) > Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size > is less than the minimum record overhead (14){code} > or: > {code:java} > java.lang.IllegalStateException: Unexpected error code 2 while fetching data > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:936) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > (...){code} > 1. Could you consider throwing CorruptRecordException from > parseCompletedFetch() when error == Errors.CORRUPT_MESSAGE? > 2. Seeking past the corrupt record means losing data. I've noticed that the > record is often correct on a follower ISR, and manual change of the partition > leader to the follower node solves the issue in case partition is used by a > single consumer group. Couldn't Kafka server discover such situations and > recover corrupt records from logs available on other ISRs somehow? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #10203: KAFKA-12415: Prepare for Gradle 7.0 and restrict transitive scope for non api dependencies
ijuma commented on a change in pull request #10203: URL: https://github.com/apache/kafka/pull/10203#discussion_r593221711 ## File path: README.md ## @@ -69,10 +69,6 @@ Generate coverage for a single module, i.e.: ### Building a binary release gzipped tar ball ### ./gradlew clean releaseTarGz -The above command will fail if you haven't set up the signing key. To bypass signing the artifact, you can run: - -./gradlew clean releaseTarGz -x signArchives Review comment: Note that your command has `install` while the README line I deleted did not (and was wrong). It is still weird that you're seeing this for snapshot builds. I'll work with you all offline to try and figure it out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #10299: KAFKA-10070: parameterize Connect unit tests to remove code duplication
mimaison commented on a change in pull request #10299: URL: https://github.com/apache/kafka/pull/10299#discussion_r593134903 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/ParameterizedTest.java ## @@ -0,0 +1,67 @@ +package org.apache.kafka.connect.util; Review comment: We're missing the license header here ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java ## @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime; +import java.util.Collection; Review comment: Should we move this with the other `java.util` imports below? ## File path: checkstyle/suppressions.xml ## @@ -158,7 +158,7 @@ files="StreamThread.java"/> + files="(KStreamImpl|KTableImpl|WorkerSourceTaskTest).java"/> Review comment: Instead of adding this to Kafka Streams rules, can we move it to the Connect section just above? ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/ParameterizedTest.java ## @@ -0,0 +1,67 @@ +package org.apache.kafka.connect.util; + +import java.lang.annotation.Annotation; +import org.junit.runner.Description; +import org.junit.runner.manipulation.Filter; +import org.junit.runner.manipulation.NoTestsRemainException; +import org.junit.runners.Parameterized; + +/** + * Running a single parameterized test causes issue as explained in + * http://youtrack.jetbrains.com/issue/IDEA-65966 and + * https://stackoverflow.com/questions/12798079/initializationerror-with-eclipse-and-junit4-when-executing-a-single-test/18438718#18438718 + * + * As a workaround, the original filter needs to be wrapped and then pass it a deparameterized + * description which removes the parameter part (See deparametrizeName) + */ +public class ParameterizedTest extends Parameterized { + + public ParameterizedTest (Class klass) throws Throwable { Review comment: Checkstyle fails because it expects indentations to be 4 spaces This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #10268: MINOR: output meaningful error message
mimaison merged pull request #10268: URL: https://github.com/apache/kafka/pull/10268 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #10268: MINOR: output meaningful error message
mimaison commented on pull request #10268: URL: https://github.com/apache/kafka/pull/10268#issuecomment-797445589 We got to see it in action immediately, as the tests failed in the [JDK8 build](https://github.com/apache/kafka/pull/10268/checks?check_run_id=2037055584): ``` java.lang.AssertionError: Connector MirrorCheckpointConnector tasks did not start in time on cluster: primary-connect-cluster ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #10192: MINOR: Add missing unit tests for Mirror Connect
mimaison merged pull request #10192: URL: https://github.com/apache/kafka/pull/10192 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10160) Kafka MM2 consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-10160: --- Fix Version/s: (was: 2.7.0) 2.4.2 > Kafka MM2 consumer configuration > > > Key: KAFKA-10160 > URL: https://issues.apache.org/jira/browse/KAFKA-10160 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Pavol Ipoth >Assignee: sats >Priority: Major > Labels: configuration, kafka, mirror-maker > Fix For: 2.4.2 > > > [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,] > according this producer/consumer level properties should be configured as > e.g. somesource->sometarget.consumer.client.id, i try to set > somesource->sometarget.consumer.auto.offset.reset=latest, but without > success, consumer always tries to fetch earliest, not sure if bug or my > misconfiguration, but then at least some update to docu would be useful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration
mimaison commented on pull request #8921: URL: https://github.com/apache/kafka/pull/8921#issuecomment-797420022 @askldjd Good catch, you're right it looks like this was only merged into 2.4. I've reopened [KAFKA-10160](https://issues.apache.org/jira/browse/KAFKA-10160). Let's see if @satishbellapu has time to open a PR in the next few days. Otherwise I'll port this change next week This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10160) Kafka MM2 consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300227#comment-17300227 ] Mickael Maison commented on KAFKA-10160: Reopening this issue as it seems it was only fixed in the 2.4 branch. [~sbellapu] can you open a PR against trunk? > Kafka MM2 consumer configuration > > > Key: KAFKA-10160 > URL: https://issues.apache.org/jira/browse/KAFKA-10160 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Pavol Ipoth >Assignee: sats >Priority: Major > Labels: configuration, kafka, mirror-maker > Fix For: 2.4.2 > > > [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,] > according this producer/consumer level properties should be configured as > e.g. somesource->sometarget.consumer.client.id, i try to set > somesource->sometarget.consumer.auto.offset.reset=latest, but without > success, consumer always tries to fetch earliest, not sure if bug or my > misconfiguration, but then at least some update to docu would be useful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-10160) Kafka MM2 consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reopened KAFKA-10160: > Kafka MM2 consumer configuration > > > Key: KAFKA-10160 > URL: https://issues.apache.org/jira/browse/KAFKA-10160 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Pavol Ipoth >Assignee: sats >Priority: Major > Labels: configuration, kafka, mirror-maker > Fix For: 2.7.0 > > > [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,] > according this producer/consumer level properties should be configured as > e.g. somesource->sometarget.consumer.client.id, i try to set > somesource->sometarget.consumer.auto.offset.reset=latest, but without > success, consumer always tries to fetch earliest, not sure if bug or my > misconfiguration, but then at least some update to docu would be useful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12449) Remove deprecated WindowStore#put
[ https://issues.apache.org/jira/browse/KAFKA-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya updated KAFKA-12449: - Affects Version/s: 3.0.0 > Remove deprecated WindowStore#put > - > > Key: KAFKA-12449 > URL: https://issues.apache.org/jira/browse/KAFKA-12449 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 3.0.0 >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > > Related to KIP-474: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeqo edited a comment on pull request #10293: KAFKA-12449: Remove deprecated WindowStore#put
jeqo edited a comment on pull request #10293: URL: https://github.com/apache/kafka/pull/10293#issuecomment-796958801 ~~Totally missed that this was proposed and accepted in KIP-474. Hope is not stepping into an existing implementation.~~ Just realized the KIP is just about deprecating this method, and this is just completing it with the removal . Turned the sub-task into a task in JIRA and linked it to the right KIP. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12449) Remove deprecated WindowStore#put
[ https://issues.apache.org/jira/browse/KAFKA-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya updated KAFKA-12449: - Parent: (was: KAFKA-10434) Issue Type: Task (was: Sub-task) > Remove deprecated WindowStore#put > - > > Key: KAFKA-12449 > URL: https://issues.apache.org/jira/browse/KAFKA-12449 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > > Related to KIP-474: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593033011 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -76,6 +76,11 @@ "wait for writes to accumulate before flushing them to disk."; Review comment: I updated the doc for the maxUnflushedBytes config. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593032627 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -76,6 +76,11 @@ "wait for writes to accumulate before flushing them to disk."; public static final int DEFAULT_QUORUM_LINGER_MS = 25; +public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes"; Review comment: I see what you are trying to say. Well, the premise of this ticket originally was to trigger fsyncs happen the moment a configured amount of bytes have been accumulated. Here is the original description for Jason in the ticket: > In KAFKA-10601, we implemented linger semantics similar to the producer to let the leader accumulate a batch of writes before fsyncing them to disk. Currently the fsync is only based on the linger time, but it would be helpful to make it size-based as well. In other words, if we accumulate a configurable N bytes, then we should not wait for linger expiration and should just fsync immediately. But as you pointed out, it is also due to the fact that in the current implementation batch append and fsync goes hand in hand. With the future implementation on deferring fsync, this might just affect the batch appends and considering that in mind, imo it makes sense to rename it to `append.linger.bytes` . It also matches with `append.linger.ms` . BTW, on the fsync deferral track. i had created a draft PR where i have outlined my approach: https://github.com/apache/kafka/pull/10278 Request you or Jason to review this.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593024705 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -474,6 +474,7 @@ public void testAccumulatorClearedAfterBecomingFollower() throws Exception { .thenReturn(buffer); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.withMaxUnflushedBytes(KafkaRaftClient.MAX_BATCH_SIZE) Review comment: Yeah. The reason for that is that in this PR @hachikuji had suggested to change the logic for setting maxBatchSize in BatchAccumulator to the following way: `this.maxBatchSize = Math.min(maxBatchSize, maxUnflushedBytes);` That is the reason I am setting some combinations of value to check if it behaves correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593019808 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -617,6 +620,38 @@ public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except assertEquals(3L, context.log.endOffset().offset); } +@Test +public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws Exception { +// This test verifies that the client will get woken up immediately +// if the linger timeout has expired during an append + +int localId = 0; +int otherNodeId = 1; +int minFlushSizeInBytes = 120; +Set voters = Utils.mkSet(localId, otherNodeId); + +RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) +.withMaxUnflushedBytes(minFlushSizeInBytes) Review comment: yeah that was an oversight as well. Changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593019665 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -617,6 +620,38 @@ public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except assertEquals(3L, context.log.endOffset().offset); } +@Test +public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws Exception { +// This test verifies that the client will get woken up immediately +// if the linger timeout has expired during an append Review comment: yes.. changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r593019531 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ## @@ -1078,6 +1078,7 @@ private static FetchResponseData snapshotFetchResponse( private static SnapshotWriter snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) { return new SnapshotWriter<>( snapshot, +1024, Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12431) Fetch Request/Response without Topic information
[ https://issues.apache.org/jira/browse/KAFKA-12431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Sinoros-Szabo resolved KAFKA-12431. - Resolution: Workaround Closing this ticket as it seem with proper settings it works fine. > Fetch Request/Response without Topic information > > > Key: KAFKA-12431 > URL: https://issues.apache.org/jira/browse/KAFKA-12431 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.1 >Reporter: Peter Sinoros-Szabo >Priority: Major > Attachments: fetch-on-2.4.1.png, fetch-on-2.6.1.png, > kafka-highcpu-24.svg.zip, kafka-highcpu-26.svg.zip > > > I was running a 6 node Kafka 2.4.1 cluster with protocol and message format > version set to 2.4. I wanted to upgrade the cluster to 2.6.1 and after I > upgraded the 1st broker to 2.6.1 without any configuration change, I noticed > much higher CPU usage on that broker (instead of 25% CPU usage it was ~350%) > and about 3-4x higher network traffic. So I dumped the traffic between the > Kafka client and broker and compared it with the traffic of the same broker > downgraded to 2.4.1. > It seems to me that after I upgraded to 2.6.1, the Fetch requests and > responses are not complete, it is missing the topics part of the Fetch > Request, I don't know for what reason. I guess there should be always a > topics part. > I'll attache a screenshot from these messages. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12373) Improve KafkaRaftClient handling of graceful shutdown
[ https://issues.apache.org/jira/browse/KAFKA-12373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17300129#comment-17300129 ] Sagar Rao commented on KAFKA-12373: --- Thanks [~hachikuji], assigned it to myself > Improve KafkaRaftClient handling of graceful shutdown > - > > Key: KAFKA-12373 > URL: https://issues.apache.org/jira/browse/KAFKA-12373 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: Sagar Rao >Priority: Major > > The current implementation simply closes the metrics group when it is closed. > When closing the KafkaRaftClient that is the leader it should perform at > least the following steps: > # Stop accepting new schedule append operations > # Append to the log the batches currently in the BatchAccumulator > # Wait with a timeout for the high-watermark to reach the LEO > # Cooperatively resign as leader from the quorum -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12373) Improve KafkaRaftClient handling of graceful shutdown
[ https://issues.apache.org/jira/browse/KAFKA-12373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-12373: - Assignee: Sagar Rao > Improve KafkaRaftClient handling of graceful shutdown > - > > Key: KAFKA-12373 > URL: https://issues.apache.org/jira/browse/KAFKA-12373 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: Sagar Rao >Priority: Major > > The current implementation simply closes the metrics group when it is closed. > When closing the KafkaRaftClient that is the leader it should perform at > least the following steps: > # Stop accepting new schedule append operations > # Append to the log the batches currently in the BatchAccumulator > # Wait with a timeout for the high-watermark to reach the LEO > # Cooperatively resign as leader from the quorum -- This message was sent by Atlassian Jira (v8.3.4#803005)