[GitHub] [kafka] lihaosky commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
lihaosky commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277119690 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata, final AssignmentConfigs assignmentConfigs) { this.fullMetadata = fullMetadata; this.partitionsForTask = partitionsForTask; -this.racksForProcessConsumer = racksForProcessConsumer; this.internalTopicManager = internalTopicManager; this.assignmentConfigs = assignmentConfigs; this.racksForPartition = new HashMap<>(); this.racksForProcess = new HashMap<>(); +validateClientRack(racksForProcessConsumer); Review Comment: Yeah. Reverted and introduced `validClientRack` to track it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd merged pull request #14049: KAFKA-14038: Optimise calculation of size for log in remote tier
satishd merged PR #14049: URL: https://github.com/apache/kafka/pull/14049 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #14049: KAFKA-14038: Optimise calculation of size for log in remote tier
satishd commented on PR #14049: URL: https://github.com/apache/kafka/pull/14049#issuecomment-1655051965 A few unrelated test failures in Jenkins jobs, merging it to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15231) Add ability to pause/resume Remote Log Manager tasks
[ https://issues.apache.org/jira/browse/KAFKA-15231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748406#comment-17748406 ] Satish Duggana edited comment on KAFKA-15231 at 7/28/23 4:48 AM: - There can be scenarios where you want to control more on remote read/write operations. One way to control respective read and write throughputs is to set respective quotas. This will be addressed with https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas which is in Draft state. was (Author: satish.duggana): There can be scenarios where you want to control more on remote read/write operations. One way to control respective read and write throughputs is to set respective quotas. This will be addressed with https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas > Add ability to pause/resume Remote Log Manager tasks > - > > Key: KAFKA-15231 > URL: https://issues.apache.org/jira/browse/KAFKA-15231 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: tiered-storage > > Once Tiered Storage is enabled, there may be situations where needed to pause > uploading tasks to a remote-tier. e.g. remote storage maintenance, > troubleshooting, etc. > An RSM implementation may not be able to do this by itself without throwing > exceptions, polluting the logs, etc. > Could we consider adding this ability to the Tiered Storage framework? Remote > Log Manager seems like a good candidate place for this; though I'm wondering > on how to expose it. > Would be interested to hear if this sounds like a good idea, and what options > we have to include these. > We have been considering extending RLM tasks with a pause flag, and having an > MBean to switch them on demand. Another option may be to extend the Kafka > protocol to expose this – but seems much moved involved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15231) Add ability to pause/resume Remote Log Manager tasks
[ https://issues.apache.org/jira/browse/KAFKA-15231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748406#comment-17748406 ] Satish Duggana commented on KAFKA-15231: There can be scenarios where you want to control more on remote read/write operations. One way to control respective read and write throughputs is to set respective quotas. This will be addressed with https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas > Add ability to pause/resume Remote Log Manager tasks > - > > Key: KAFKA-15231 > URL: https://issues.apache.org/jira/browse/KAFKA-15231 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: tiered-storage > > Once Tiered Storage is enabled, there may be situations where needed to pause > uploading tasks to a remote-tier. e.g. remote storage maintenance, > troubleshooting, etc. > An RSM implementation may not be able to do this by itself without throwing > exceptions, polluting the logs, etc. > Could we consider adding this ability to the Tiered Storage framework? Remote > Log Manager seems like a good candidate place for this; though I'm wondering > on how to expose it. > Would be interested to hear if this sounds like a good idea, and what options > we have to include these. > We have been considering extending RLM tasks with a pause flag, and having an > MBean to switch them on demand. Another option may be to extend the Kafka > protocol to expose this – but seems much moved involved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15265: --- Description: Related KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas > Remote copy/fetch quotas for tiered storage. > > > Key: KAFKA-15265 > URL: https://issues.apache.org/jira/browse/KAFKA-15265 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Assignee: Abhijeet Kumar >Priority: Major > > Related KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.
Satish Duggana created KAFKA-15265: -- Summary: Remote copy/fetch quotas for tiered storage. Key: KAFKA-15265 URL: https://issues.apache.org/jira/browse/KAFKA-15265 Project: Kafka Issue Type: Improvement Components: core Reporter: Satish Duggana Assignee: Abhijeet Kumar -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled
[ https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748402#comment-17748402 ] Satish Duggana commented on KAFKA-15214: [~yaolixin] Is the client trying to fetch the remote data and encountering OffsetOutOfRangeException? What offset client was trying to fetch? We should not add OffsetOutOfRangeException errors as RemoteReadErrorsPerSec, because OffsetOutOfRangeException is a valid error and it may not be related to any errors while fetching the remote reads. We can add more logs if needed for better clarity on the errors encountered. > Add metrics for OffsetOutOfRangeException when tiered storage is enabled > > > Key: KAFKA-15214 > URL: https://issues.apache.org/jira/browse/KAFKA-15214 > Project: Kafka > Issue Type: Sub-task > Components: metrics >Affects Versions: 3.6.0 >Reporter: Lixin Yao >Priority: Minor > Labels: KIP-405 > Fix For: 3.6.0 > > > In the current metrics RemoteReadErrorsPerSec, the exception type > OffsetOutOfRangeException is not included. > In our testing with tiered storage feature (at Apple), we noticed several > cases where remote download is affected and stuck due to repeatedly > OffsetOutOfRangeException in some particular broker or topic partitions. The > root cause could be various but currently without a metrics it's very hard to > catch this issue and debug in a timely fashion. It's understandable that the > exception itself could not be the root cause but this exception metric could > be a good metrics for us to alert and investigate. > Related discussion > [https://github.com/apache/kafka/pull/13944#discussion_r1266243006] > I am happy to contribute to this if the request is agreed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] github-actions[bot] commented on pull request #13568: KAFKA-14906:Extract the coordinator service log from server log
github-actions[bot] commented on PR #13568: URL: https://github.com/apache/kafka/pull/13568#issuecomment-1654936322 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java: ## @@ -313,20 +315,145 @@ public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { ); } +@Test +public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() { +final RackAwareTaskAssignor rackAwareTaskAssignor = mock(RackAwareTaskAssignor.class); + when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true); +final Map racksForProcess = mkMap( +mkEntry(UUID_1, "rack1"), +mkEntry(UUID_2, "rack2"), +mkEntry(UUID_3, "rack3"), +mkEntry(UUID_4, "rack1"), +mkEntry(UUID_5, "rack2"), +mkEntry(UUID_6, "rack3"), +mkEntry(UUID_7, "rack1"), +mkEntry(UUID_8, "rack2"), +mkEntry(UUID_9, "rack3") +); + when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess); +final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2); +standbyTaskAssignor = StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor); +verify(rackAwareTaskAssignor, times(1)).racksForProcess(); + +final Map clientStates = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())), +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())), +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())), + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())), +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())), +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap())) +); + +final Map clientStatesWithTags = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3, + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3 +); + +final Set allActiveTasks = findAllActiveTasks(clientStates); + +standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs); + +final AssignmentConfigs assignmentConfigsWithTags = newAssignmentConfigs(2, ZONE_TAG); +standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); +standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, allActiveTasks, assignmentConfigsWithTags); + +Stream.of(clientStates, clientStatesWithTags).forEach( +cs -> { + assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity)); +Stream.of(UUID_1, UUID_2, UUID_3) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 0)); +Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 2)); +assertTotalNumberOfStandbyTasksEqualsTo(cs, 12); + +assertTrue( +standbyClientsHonorRackAwareness( +TASK_0_0, +cs, +singletonList( +mkSet(UUID_6, UUID_8) Review Comment: Thanks. So my understanding was actually correct. Follow up question: should we keep this test tight as it is right now, or should we actually say, we only verify that the algorithm does not pick anything in zone_1 (ie, not `UUID_1, 4, 7`), and thus, we pass in all _valid_ UUID form all different zones instead, ie, we pass in `UUID_2, 5, 6, 8, 9` ?
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java: ## @@ -313,20 +315,145 @@ public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { ); } +@Test +public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() { +final RackAwareTaskAssignor rackAwareTaskAssignor = mock(RackAwareTaskAssignor.class); + when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true); +final Map racksForProcess = mkMap( +mkEntry(UUID_1, "rack1"), +mkEntry(UUID_2, "rack2"), +mkEntry(UUID_3, "rack3"), +mkEntry(UUID_4, "rack1"), +mkEntry(UUID_5, "rack2"), +mkEntry(UUID_6, "rack3"), +mkEntry(UUID_7, "rack1"), +mkEntry(UUID_8, "rack2"), +mkEntry(UUID_9, "rack3") +); + when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess); +final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2); +standbyTaskAssignor = StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor); +verify(rackAwareTaskAssignor, times(1)).racksForProcess(); + +final Map clientStates = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())), +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())), +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())), + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())), +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())), +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap())) +); + +final Map clientStatesWithTags = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3, + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3 +); + +final Set allActiveTasks = findAllActiveTasks(clientStates); + +standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs); + +final AssignmentConfigs assignmentConfigsWithTags = newAssignmentConfigs(2, ZONE_TAG); +standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); +standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, allActiveTasks, assignmentConfigsWithTags); + +Stream.of(clientStates, clientStatesWithTags).forEach( +cs -> { + assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity)); +Stream.of(UUID_1, UUID_2, UUID_3) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 0)); +Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 2)); +assertTotalNumberOfStandbyTasksEqualsTo(cs, 12); + +assertTrue( +standbyClientsHonorRackAwareness( +TASK_0_0, +cs, +singletonList( +mkSet(UUID_6, UUID_8) Review Comment: Thanks. So my understanding was actually correct. Follow up question: should we keep this test tight as it is right now, or should we actually say, we only verify that the algorithm does not pick anything in zone_1 (ie, not `UUID_1, 4, 7`), and thus, we pass in all _valid_ UUID form all different zones instead, ie, we pass in `UUID_2, 5, 6, 8, 9` ? --
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java: ## @@ -313,20 +315,145 @@ public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() { ); } +@Test +public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() { +final RackAwareTaskAssignor rackAwareTaskAssignor = mock(RackAwareTaskAssignor.class); + when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true); +final Map racksForProcess = mkMap( +mkEntry(UUID_1, "rack1"), +mkEntry(UUID_2, "rack2"), +mkEntry(UUID_3, "rack3"), +mkEntry(UUID_4, "rack1"), +mkEntry(UUID_5, "rack2"), +mkEntry(UUID_6, "rack3"), +mkEntry(UUID_7, "rack1"), +mkEntry(UUID_8, "rack2"), +mkEntry(UUID_9, "rack3") +); + when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess); +final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2); +standbyTaskAssignor = StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor); +verify(rackAwareTaskAssignor, times(1)).racksForProcess(); + +final Map clientStates = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())), +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())), +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())), + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())), +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())), +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap())) +); + +final Map clientStatesWithTags = mkMap( +mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)), +mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)), +mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)), + +mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3, + +mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap(mkEntry(ZONE_TAG, ZONE_1, +mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap(mkEntry(ZONE_TAG, ZONE_2, +mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap(mkEntry(ZONE_TAG, ZONE_3 +); + +final Set allActiveTasks = findAllActiveTasks(clientStates); + +standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs); + +final AssignmentConfigs assignmentConfigsWithTags = newAssignmentConfigs(2, ZONE_TAG); +standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); +standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, allActiveTasks, assignmentConfigsWithTags); + +Stream.of(clientStates, clientStatesWithTags).forEach( +cs -> { + assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity)); +Stream.of(UUID_1, UUID_2, UUID_3) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 0)); +Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9) +.forEach(client -> assertStandbyTaskCountForClientEqualsTo(cs, client, 2)); +assertTotalNumberOfStandbyTasksEqualsTo(cs, 12); + +assertTrue( +standbyClientsHonorRackAwareness( +TASK_0_0, +cs, +singletonList( +mkSet(UUID_6, UUID_8) Review Comment: > The algorithm eventually picked 6 and 8. Ah. So my understanding was actually correct. Follow up question: should we make this test tight as it is right now, or should we actually say, we only verify that the algorithm does not pick anything in zone_1, and thus, we pass in all _valid_ UUID form different zones instead, ie, we pass in `UUID_2,
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata, final AssignmentConfigs assignmentConfigs) { this.fullMetadata = fullMetadata; this.partitionsForTask = partitionsForTask; -this.racksForProcessConsumer = racksForProcessConsumer; this.internalTopicManager = internalTopicManager; this.assignmentConfigs = assignmentConfigs; this.racksForPartition = new HashMap<>(); this.racksForProcess = new HashMap<>(); +validateClientRack(racksForProcessConsumer); Review Comment: I think we should document that we might throw here, because when we create ` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill the `StreamThread`? Now that I am realizing this, I am actually wondering if it's a good idea to handle it this way? In the end, for the callee it might be better to figure out if the `RackAwareTaskAssignor` can we used or not via method call before we create it? Maybe we nee some `static` method into which we pass `racksForProcessConsumer`? -- Any other idea to make it more natural for the callee? If we think handling the exception is fine for the callee, also ok with me. Just asking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata, final AssignmentConfigs assignmentConfigs) { this.fullMetadata = fullMetadata; this.partitionsForTask = partitionsForTask; -this.racksForProcessConsumer = racksForProcessConsumer; this.internalTopicManager = internalTopicManager; this.assignmentConfigs = assignmentConfigs; this.racksForPartition = new HashMap<>(); this.racksForProcess = new HashMap<>(); +validateClientRack(racksForProcessConsumer); Review Comment: I think we should document that we might throw here, because when we create ` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill the `StreamThread`? Now that I am realizing this, I am actually wondering if it's a good idea to handle it this way? In the end, for the callee it might be better to figure out if the `RackAwareTaskAssignor` can we used or not via method call before we create it? Maybe we nee some `static` method into which we pass `racksForProcessConsumer`? -- Any other idea to make it more natural for the callee? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata, final AssignmentConfigs assignmentConfigs) { this.fullMetadata = fullMetadata; this.partitionsForTask = partitionsForTask; -this.racksForProcessConsumer = racksForProcessConsumer; this.internalTopicManager = internalTopicManager; this.assignmentConfigs = assignmentConfigs; this.racksForPartition = new HashMap<>(); this.racksForProcess = new HashMap<>(); +validateClientRack(racksForProcessConsumer); Review Comment: I think we should document that we might throw here, because when we create ` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill the `StreamThread`? Now that I am realizing this, I am actually wondering if it's a good idea to handle it this way? In the end, for the callee it might be better to figure out if the `RackAwareTaskAssignor` can we used or not via method call before we create it? Maybe we nee some `static` method? -- Any other idea to make it more natural for the callee? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment
mjsax commented on code in PR #14097: URL: https://github.com/apache/kafka/pull/14097#discussion_r1277014726 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata, final AssignmentConfigs assignmentConfigs) { this.fullMetadata = fullMetadata; this.partitionsForTask = partitionsForTask; -this.racksForProcessConsumer = racksForProcessConsumer; Review Comment: We can also remove `racksForProcessConsumer` from the parameter list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
[ https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jianbin.chen updated KAFKA-15264: - Attachment: image-2023-07-28-09-52-38-941.png Description: I was preparing to upgrade from 1.1.0 to 3.5.1's kraft mode (new cluster deployment), and when I recently compared and tested, I found that when using the following stress test command, the throughput gap is obvious {code:java} ./kafka-producer-perf-test.sh --topic test321 --num-records 3000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=xxx: acks=1 419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg latency, 588.0 ms max latency. 555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg latency, 460.0 ms max latency. 552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg latency, 1120.0 ms max latency. 552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg latency, 1097.0 ms max latency. 538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg latency, 610.0 ms max latency. 511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg latency, 1892.0 ms max latency. 511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg latency, 3000.0 ms max latency. 519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg latency, 1781.0 ms max latency. 513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg latency, 2590.0 ms max latency. 463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg latency, 1463.0 ms max latency. 494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg latency, 2362.0 ms max latency. 506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg latency, 2986.0 ms max latency. 393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg latency, 2958.0 ms max latency. 426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg latency, 1959.0 ms max latency. 412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg latency, 1995.0 ms max latency. 370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg latency, 1496.0 ms max latency. 391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg latency, 2446.0 ms max latency. 355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg latency, 2715.0 ms max latency. 385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg latency, 2702.0 ms max latency. 381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg latency, 1846.0 ms max latency. 67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg latency, 1414.0 ms max latency. 376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg latency, 1897.0 ms max latency. 354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg latency, 1601.0 ms max latency. 353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg latency, 1563.0 ms max latency. 321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg latency, 1975.0 ms max latency. 404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg latency, 1753.0 ms max latency. 384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg latency, 1833.0 ms max latency. 387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg latency, 1927.0 ms max latency. 343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg latency, 1685.0 ms max latency. 00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg latency, 2146.0 ms max latency. 361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg latency, 2125.0 ms max latency. 357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg latency, 1502.0 ms max latency. 340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg latency, 1932.0 ms max latency. 390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg latency, 1807.0 ms max latency. 352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg latency, 1892.0 ms max latency. 354526 records sent, 70905.2 records/sec (69.24 MB/sec), 429.6 ms avg latency, 2128.0 ms max latency. 356670 records sent, 71305.5 records/sec (69.63 MB/sec), 408.9 ms avg latency, 1329.0 ms max latency. 309204 records sent, 60687.7 records/sec (59.27 MB/sec), 438.6 ms avg latency, 2566.0 ms max latency. 366715 records sent, 72316.1 records/sec (70.62 MB/sec), 474.5 ms avg latency, 2169.0 ms max latency. 375174 records sent, 75034.8 records/sec (73.28 MB/sec), 429.9 ms avg latency, 1722.0 ms max latency. 359400 records sent, 70346.4 records/sec (68.70 MB/sec), 432.1 ms avg latency, 1961.0 ms max latency. 312276 records sent, 62430.2 records/sec (60.97 MB/sec), 477.4 ms avg latency, 2006.0 ms max latency.
[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748367#comment-17748367 ] Matthias J. Sax commented on KAFKA-15259: - Thanks for getting back – I was just asking a general question :) The DQL feature seems to be independent though, and I agree that it could be useful to add to Kafka Streams. About the handler: looking into the stacktrace, it seem that the issue is actually happening during commit, in particular when offset are written: sendOffsetsToTransaction(KafkaProducer.java:757) This is a totally different code path. The `ProductionExceptionHandler` is covering the `producer.send()` code path only. – Looking into the code of both 3.1 and 3.2, the behavior should be the same: for the call to `sendOffsetsToTransactions` the handler won't be triggered. And for this case, we also cannot trigger the handler, because there is nothing to be dropped on the floor – Kafka Streams tries to write offsets to commit a TX and we cannot skip writing offsets. {quote}Our additional testing revealed that "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka Streams 3.2.0, as rollback occurs. {quote} Did you test this for `send()` or the commit case? For the `send()` case it should work for both versions; for the commit-case it should not work for either version (and is something that cannot be fixed). Curious to hear about your findings. > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once, Kafka Streams > continue processing without rollback when > ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the > debug log (attached file: app_at_least_once.log). > - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka > Streams 3.2.0, as rollback occurs. > (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler > - > [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler] > (2) Transaction abort and shutdown occur > {code:java} > 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer > clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, > transactionalId=java-kafka-streams-0_0] Exception occurred during message > send: > org.apache.kafka.common.errors.RecordTooLargeException: The message is > 1188 bytes when serialized which is larger than 1048576, which is the > value of the max.request.size configuration. > 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread >
[jira] [Updated] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
[ https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jianbin.chen updated KAFKA-15264: - Attachment: image-2023-07-28-09-51-01-662.png Description: I was preparing to upgrade from 1.1.0 to 3.5.1's kraft mode (new cluster deployment), and when I recently compared and tested, I found that when using the following stress test command, the throughput gap is obvious {code:java} ./kafka-producer-perf-test.sh --topic test321 --num-records 3000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=xxx: acks=1 419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg latency, 588.0 ms max latency. 555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg latency, 460.0 ms max latency. 552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg latency, 1120.0 ms max latency. 552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg latency, 1097.0 ms max latency. 538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg latency, 610.0 ms max latency. 511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg latency, 1892.0 ms max latency. 511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg latency, 3000.0 ms max latency. 519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg latency, 1781.0 ms max latency. 513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg latency, 2590.0 ms max latency. 463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg latency, 1463.0 ms max latency. 494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg latency, 2362.0 ms max latency. 506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg latency, 2986.0 ms max latency. 393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg latency, 2958.0 ms max latency. 426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg latency, 1959.0 ms max latency. 412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg latency, 1995.0 ms max latency. 370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg latency, 1496.0 ms max latency. 391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg latency, 2446.0 ms max latency. 355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg latency, 2715.0 ms max latency. 385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg latency, 2702.0 ms max latency. 381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg latency, 1846.0 ms max latency. 67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg latency, 1414.0 ms max latency. 376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg latency, 1897.0 ms max latency. 354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg latency, 1601.0 ms max latency. 353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg latency, 1563.0 ms max latency. 321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg latency, 1975.0 ms max latency. 404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg latency, 1753.0 ms max latency. 384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg latency, 1833.0 ms max latency. 387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg latency, 1927.0 ms max latency. 343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg latency, 1685.0 ms max latency. 00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg latency, 2146.0 ms max latency. 361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg latency, 2125.0 ms max latency. 357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg latency, 1502.0 ms max latency. 340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg latency, 1932.0 ms max latency. 390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg latency, 1807.0 ms max latency. 352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg latency, 1892.0 ms max latency. 354526 records sent, 70905.2 records/sec (69.24 MB/sec), 429.6 ms avg latency, 2128.0 ms max latency. 356670 records sent, 71305.5 records/sec (69.63 MB/sec), 408.9 ms avg latency, 1329.0 ms max latency. 309204 records sent, 60687.7 records/sec (59.27 MB/sec), 438.6 ms avg latency, 2566.0 ms max latency. 366715 records sent, 72316.1 records/sec (70.62 MB/sec), 474.5 ms avg latency, 2169.0 ms max latency. 375174 records sent, 75034.8 records/sec (73.28 MB/sec), 429.9 ms avg latency, 1722.0 ms max latency. 359400 records sent, 70346.4 records/sec (68.70 MB/sec), 432.1 ms avg latency, 1961.0 ms max latency. 312276 records sent, 62430.2 records/sec (60.97 MB/sec), 477.4 ms avg latency, 2006.0 ms max latency.
[GitHub] [kafka] jeffkbkim commented on pull request #14117: MINOR: Code cleanups in group-coordinator module
jeffkbkim commented on PR #14117: URL: https://github.com/apache/kafka/pull/14117#issuecomment-1654851021 can you point me to where the log context includes the topic partition info? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
jianbin.chen created KAFKA-15264: Summary: Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter Key: KAFKA-15264 URL: https://issues.apache.org/jira/browse/KAFKA-15264 Project: Kafka Issue Type: Bug Reporter: jianbin.chen I was preparing to upgrade from 1.1.0 to 3.5.1's kraft mode (new cluster deployment), and when I recently compared and tested, I found that when using the following stress test command, the throughput gap is obvious {code:java} ./kafka-producer-perf-test.sh --topic test321 --num-records 3000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=xxx: acks=1 419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg latency, 588.0 ms max latency. 555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg latency, 460.0 ms max latency. 552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg latency, 1120.0 ms max latency. 552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg latency, 1097.0 ms max latency. 538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg latency, 610.0 ms max latency. 511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg latency, 1892.0 ms max latency. 511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg latency, 3000.0 ms max latency. 519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg latency, 1781.0 ms max latency. 513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg latency, 2590.0 ms max latency. 463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg latency, 1463.0 ms max latency. 494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg latency, 2362.0 ms max latency. 506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg latency, 2986.0 ms max latency. 393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg latency, 2958.0 ms max latency. 426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg latency, 1959.0 ms max latency. 412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg latency, 1995.0 ms max latency. 370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg latency, 1496.0 ms max latency. 391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg latency, 2446.0 ms max latency. 355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg latency, 2715.0 ms max latency. 385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg latency, 2702.0 ms max latency. 381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg latency, 1846.0 ms max latency. 67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg latency, 1414.0 ms max latency. 376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg latency, 1897.0 ms max latency. 354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg latency, 1601.0 ms max latency. 353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg latency, 1563.0 ms max latency. 321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg latency, 1975.0 ms max latency. 404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg latency, 1753.0 ms max latency. 384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg latency, 1833.0 ms max latency. 387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg latency, 1927.0 ms max latency. 343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg latency, 1685.0 ms max latency. 00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg latency, 2146.0 ms max latency. 361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg latency, 2125.0 ms max latency. 357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg latency, 1502.0 ms max latency. 340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg latency, 1932.0 ms max latency. 390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg latency, 1807.0 ms max latency. 352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg latency, 1892.0 ms max latency. 354526 records sent, 70905.2 records/sec (69.24 MB/sec), 429.6 ms avg latency, 2128.0 ms max latency. 356670 records sent, 71305.5 records/sec (69.63 MB/sec), 408.9 ms avg latency, 1329.0 ms max latency. 309204 records sent, 60687.7 records/sec (59.27 MB/sec), 438.6 ms avg latency, 2566.0 ms max latency. 366715 records sent, 72316.1 records/sec (70.62 MB/sec), 474.5 ms avg latency, 2169.0 ms max latency. 375174 records sent, 75034.8 records/sec (73.28 MB/sec), 429.9 ms avg latency, 1722.0 ms max latency. 359400 records sent, 70346.4 records/sec (68.70 MB/sec), 432.1 ms avg latency, 1961.0 ms max latency. 312276
[GitHub] [kafka] philipnee commented on pull request #14086: MINOR: Test assign() and assignment() in the integration test
philipnee commented on PR #14086: URL: https://github.com/apache/kafka/pull/14086#issuecomment-1654826392 Hey @junrao Thanks a lot for reviewing this. I fixed the broken build and here are the failing tests. I believe they are unrelated: ``` Build / JDK 17 and Scala 2.13 / testReplication() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest 3m 11s Build / JDK 17 and Scala 2.13 / testDescribeAtMinIsrPartitions(String).quorum=kraft – kafka.admin.TopicCommandIntegrationTest 19s Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 2m 3s Build / JDK 8 and Scala 2.12 / [3] idBlockLen=10 – kafka.coordinator.transaction.ProducerIdManagerTest <1s Build / JDK 8 and Scala 2.12 / testSnapshotsGenerated() – kafka.server.RaftClusterSnapshotTest 26s Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 12s Build / JDK 8 and Scala 2.12 / shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once] – org.apache.kafka.streams.integration.EosIntegrationTest 12s Build / JDK 11 and Scala 2.13 / testDescribeAtMinIsrPartitions(String).quorum=kraft – kafka.admin.TopicCommandIntegrationTest 18s Build / JDK 11 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s Build / JDK 20 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest 2m 0s Build / JDK 20 and Scala 2.13 / testRackAwareRangeAssignor() – integration.kafka.server.FetchFromFollowerIntegrationTest 40s Build / JDK 20 and Scala 2.13 / testRackAwareRangeAssignor() – integration.kafka.server.FetchFromFollowerIntegrationTest 42s Build / JDK 20 and Scala 2.13 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 12s ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276971308 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276959521 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] cmccabe merged pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe merged PR #13643: URL: https://github.com/apache/kafka/pull/13643 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276944101 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276943880 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276942521 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276939557 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276936889 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276923811 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276922896 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(0); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownMemberExistingGroup() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276921862 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( +context, +"group-id", +"leader-instance-id", +"follower-instance-id" +); + +SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() +.withGroupId("group-id") +.withGroupInstanceId("leader-instance-id") +.withMemberId(rebalanceResult.leaderId) +.withGenerationId(rebalanceResult.generationId) +.build(); + +CompletableFuture syncFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupSync(syncRequest, syncFuture); + +assertTrue(result.records().isEmpty()); +assertTrue(syncFuture.isDone()); +assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId(rebalanceResult.leaderId) +.setGenerationId(rebalanceResult.generationId); + +HeartbeatResponseData validHeartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); + +HeartbeatResponseData inValidHeartbeatResponse = context.sendGenericGroupHeartbeat( +heartbeatRequest.setGroupInstanceId("leader-instance-id") +.setMemberId("invalid-member-id")); + +assertEquals(Errors.FENCED_INSTANCE_ID.code(), inValidHeartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatUnknownGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatDeadGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +group.transitionTo(DEAD); + +HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData() +.setGroupId("group-id") +.setMemberId("member-id") +.setGenerationId(-1); + +HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); +assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), heartbeatResponse.errorCode()); +} + +@Test +public void testHeartbeatEmptyGroup() { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +GenericGroup group = context.createGenericGroup("group-id"); + +JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); +protocols.add(new JoinGroupRequestProtocol() +.setName("range") +.setMetadata(new byte[]{0})); + +group.add(new GenericGroupMember( +"member-id", +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols +)); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(EMPTY); Review Comment: thanks. removed the transitions, the group starts off as empty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276919519 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); + +RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( Review Comment: resolved via rebase -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276919382 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -8540,5 +8568,1182 @@ private static class RebalanceResult { this.followerAssignment = followerAssignment; } } + +@Test +public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exception { +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.build(); +context.createGenericGroup("group-id"); Review Comment: Thanks for the catch. Removed from all tests that call staticMembersJoinAndRebalance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
jeffkbkim commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1276874868 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -368,9 +369,29 @@ public CompletableFuture heartbeat( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +if (!isGroupIdNotEmpty(request.groupId())) { +return CompletableFuture.completedFuture(new HeartbeatResponseData() +.setErrorCode(Errors.INVALID_GROUP_ID.code())); +} + +return runtime.scheduleReadOperation("generic-group-heartbeat", +topicPartitionFor(request.groupId()), +(coordinator, __) -> coordinator.genericGroupHeartbeat(context, request) Review Comment: as discussed offline, we will keep it as is since it is how the current coordinator behaves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request, #14118: KAFKA-14875: Implement wakeup
philipnee opened a new pull request, #14118: URL: https://github.com/apache/kafka/pull/14118 Continuation of https://github.com/apache/kafka/pull/13490. I closed the original one due to rebase difficulties. **Summary Implemented wakeup() mechanism using a WakeupTrigger class to store the pending wakeup item, and when wakeup() is invoked, it checks whether there's an active task or a wakeup task. - If there's an active task: the task will be completed exceptionally and the atomic reference will be freed up. - If there an wakedup task, which means wakeup() was invoked before a blocking call was issued. Therefore, the current task will be completed exceptionally immediately. This PR also addressed minor issues such is: 1. Throwing WakeupException at the right place: As wakeups are thrown by completing an active future exceptionally. The WakeupException is wrapped inside of the ExecutionException. 2. mockConstruction is a thread-lock mock; therefore, we need to free up the reference before completing the test. Otherwise, other tests will continue using the thread-lock mock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee closed pull request #13490: KAFKA-14875: Implement wakeup
philipnee closed pull request #13490: KAFKA-14875: Implement wakeup URL: https://github.com/apache/kafka/pull/13490 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276830477 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -764,4 +813,39 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + +// For the purpose of testing let: +// a) Number of replicas for each partition = 2 +// b) Number of racks available in the cluster = 4 Review Comment: Okay I will add it to the javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276830285 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5567,7 +5600,7 @@ public void testJoinGroupAppendErrorConversion() { assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE); } -private void assertUnorderedListEquals( +public static void assertUnorderedListEquals( Review Comment: Yeah that makes sense, I'll have to move all the corresponding methods also to that class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #14084: [MINOR] Add latest versions to kraft upgrade kafkatest
rondagostino commented on code in PR #14084: URL: https://github.com/apache/kafka/pull/14084#discussion_r1276771939 ## tests/kafkatest/version.py: ## @@ -252,3 +252,7 @@ def get_version(node=None): V_3_5_0 = KafkaVersion("3.5.0") V_3_5_1 = KafkaVersion("3.5.1") LATEST_3_5 = V_3_5_1 + +# 3.6.x versions +V_3_6_0 = KafkaVersion("3.6.0") +LATEST_3_6 = V_3_6_0 Review Comment: Ok, we'll just leave it here and add it back in now. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable
[ https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-14976: -- Assignee: Almog Gavra > Left/outer stream-stream joins create KV stores that aren't customizable > > > Key: KAFKA-14976 > URL: https://issues.apache.org/jira/browse/KAFKA-14976 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Almog Gavra >Priority: Major > Labels: needs-kip > > It appears that we only give the illusion of full customizability when it > comes to the state stores of a windowed join. This arose due to an > [optimization|https://github.com/apache/kafka/pull/11252] for the performance > of the spurious results fix, and means that these joins now come with one > additional, and possibly unexpected, state store: > > {code:java} > final StoreBuilder, > LeftOrRightValue>> builder = > new ListValueStoreBuilder<>( > |--[ persistent ? > this--> | Stores.persistentKeyValueStore(storeName) : > |--[ Stores.inMemoryKeyValueStore(storeName), > timestampedKeyAndJoinSideSerde, > leftOrRightValueSerde, > Time.SYSTEM > ); {code} > > where persistent is defined above that as > {code:java} > final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null > || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code} > > This means regardless of whether a custom state store implementation was > passed in to the join, we will still insert one of our RocksDB or InMemory > state stores. Which might be very surprising since the API makes it seem like > the underlying stores are fully configurable. > I'm adding a warning line for this in PR > [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336] > but we should really make this hidden state store fully configurable like > the window stores currently are (which will require a KIP) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable
[ https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14976: --- Labels: kip (was: needs-kip) > Left/outer stream-stream joins create KV stores that aren't customizable > > > Key: KAFKA-14976 > URL: https://issues.apache.org/jira/browse/KAFKA-14976 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Almog Gavra >Priority: Major > Labels: kip > > It appears that we only give the illusion of full customizability when it > comes to the state stores of a windowed join. This arose due to an > [optimization|https://github.com/apache/kafka/pull/11252] for the performance > of the spurious results fix, and means that these joins now come with one > additional, and possibly unexpected, state store: > > {code:java} > final StoreBuilder, > LeftOrRightValue>> builder = > new ListValueStoreBuilder<>( > |--[ persistent ? > this--> | Stores.persistentKeyValueStore(storeName) : > |--[ Stores.inMemoryKeyValueStore(storeName), > timestampedKeyAndJoinSideSerde, > leftOrRightValueSerde, > Time.SYSTEM > ); {code} > > where persistent is defined above that as > {code:java} > final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null > || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code} > > This means regardless of whether a custom state store implementation was > passed in to the join, we will still insert one of our RocksDB or InMemory > state stores. Which might be very surprising since the API makes it seem like > the underlying stores are fully configurable. > I'm adding a warning line for this in PR > [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336] > but we should really make this hidden state store fully configurable like > the window stores currently are (which will require a KIP) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable
[ https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748314#comment-17748314 ] A. Sophie Blee-Goldman commented on KAFKA-14976: Will be addressed by https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types > Left/outer stream-stream joins create KV stores that aren't customizable > > > Key: KAFKA-14976 > URL: https://issues.apache.org/jira/browse/KAFKA-14976 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > It appears that we only give the illusion of full customizability when it > comes to the state stores of a windowed join. This arose due to an > [optimization|https://github.com/apache/kafka/pull/11252] for the performance > of the spurious results fix, and means that these joins now come with one > additional, and possibly unexpected, state store: > > {code:java} > final StoreBuilder, > LeftOrRightValue>> builder = > new ListValueStoreBuilder<>( > |--[ persistent ? > this--> | Stores.persistentKeyValueStore(storeName) : > |--[ Stores.inMemoryKeyValueStore(storeName), > timestampedKeyAndJoinSideSerde, > leftOrRightValueSerde, > Time.SYSTEM > ); {code} > > where persistent is defined above that as > {code:java} > final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null > || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code} > > This means regardless of whether a custom state store implementation was > passed in to the join, we will still insert one of our RocksDB or InMemory > state stores. Which might be very surprising since the API makes it seem like > the underlying stores are fully configurable. > I'm adding a warning line for this in PR > [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336] > but we should really make this hidden state store fully configurable like > the window stores currently are (which will require a KIP) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
ableegoldman commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1654436865 @flashmouse you can decrease the consumer count. For one thing, this covers the edge case where the consumers do not all have the same topic subscription, which seems to be quite rare. For another, 2,000 consumers is fairly large to begin with. And especially large for a consumer group where the consumers are all given different topic subscriptions. Honestly it's probably more important to test with a higher partition count than a large group of consumers like this. Are you able to get it passing within 90s with 500 consumers and 5,000 partitions? That feels like a more realistic large-scale test case. If not, fix it with just 500 consumers and maybe run the test with 4,000, then 3,000, then 2,000 partitions, etc, until you can get it to pass. If it's still not passing with 500 consumers and 2000 partitions, that's a bit troubling. But we can cross that bridge if it comes up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276742165 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5567,7 +5600,7 @@ public void testJoinGroupAppendErrorConversion() { assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE); } -private void assertUnorderedListEquals( +public static void assertUnorderedListEquals( Review Comment: I see. I wonder if we should actually move `recordEquals` to `RecordHelpersTest`. The dependency would make more sense this way, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276739854 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -764,4 +813,39 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + +// For the purpose of testing let: +// a) Number of replicas for each partition = 2 +// b) Number of racks available in the cluster = 4 Review Comment: The javadoc is definitely a better place than this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #14117: MINOR: Code cleanups in group-coordinator module
dajac opened a new pull request, #14117: URL: https://github.com/apache/kafka/pull/14117 This patch does a few code cleanups in the group-coordinator module. - It renames `Coordinator` to `CoordinatorShard`; - It renames `ReplicatedGroupCoordinator` to `GroupCoordinatorShard`. I was never really happy with this name. The new name makes more sense to me; - It removes `TopicPartition` from the `GroupMetadataManager`. It was only used in log messages. The log context already includes it so we don't have to log it again. - It renames `assignors` to `consumerGroupAssignors`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph opened a new pull request, #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph opened a new pull request, #14116: URL: https://github.com/apache/kafka/pull/14116 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276710063 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -764,4 +813,39 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + +// For the purpose of testing let: +// a) Number of replicas for each partition = 2 +// b) Number of racks available in the cluster = 4 Review Comment: Its applicable to both the mkMapOfPartitionRacks and mkListOfPartitionRacks methods to show why there's two rackIds in each set and why the mod is 4. I got confused when I saw the old tests and didn't understand why a certain calculation was made and what the assumptions behind it were. I initially added it to the javadoc for the first method but realised it probably doesn't belong there. I'm not sure if I should place it inside the method for both or where the correct placement would be -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on PR #13643: URL: https://github.com/apache/kafka/pull/13643#issuecomment-1654254344 > I couldn't find a test for the new KafkaRaftClient::scheduleAtomicAppend. I added `KafkaRaftClientTest.testAppendWithRequiredBaseOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276706144 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals( fromTopicPartitions(actualValue.partitionsPendingRevocation())); assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), fromTopicPartitions(actualValue.partitionsPendingAssignment())); -} else { -assertEquals(expected.message(), actual.message()); +} else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { +// The order of the racks stored in PartitionMetadata of ConsumerGroupPartitionMetadataValue is not +// always guaranteed. Therefore, we need a special comparator. +ConsumerGroupPartitionMetadataValue expectedValue = +(ConsumerGroupPartitionMetadataValue) expected.message(); +ConsumerGroupPartitionMetadataValue actualValue = +(ConsumerGroupPartitionMetadataValue) actual.message(); + +List expectedTopicMetadataList = +expectedValue.topics(); +List actualTopicMetadataList = +actualValue.topics(); + +if (expectedTopicMetadataList.size() != actualTopicMetadataList.size()) { +fail("Topic metadata lists have different sizes"); +} + +for (int i = 0; i < expectedValue.topics().size(); i++) { +ConsumerGroupPartitionMetadataValue.TopicMetadata expectedTopicMetadata = +expectedTopicMetadataList.get(i); +ConsumerGroupPartitionMetadataValue.TopicMetadata actualTopicMetadata = +actualTopicMetadataList.get(i); + +assertEquals(expectedTopicMetadata.topicId(), actualTopicMetadata.topicId()); +assertEquals(expectedTopicMetadata.topicName(), actualTopicMetadata.topicName()); +assertEquals(expectedTopicMetadata.numPartitions(), actualTopicMetadata.numPartitions()); + +List expectedPartitionMetadataList = +expectedTopicMetadata.partitionMetadata(); +List actualPartitionMetadataList = +actualTopicMetadata.partitionMetadata(); + +// If the list is empty, rack information wasn't available for any replica of +// the partition and hence, the entry wasn't added to the record. +if (expectedPartitionMetadataList.size() != actualPartitionMetadataList.size()) { +fail("Partition metadata lists have different sizes"); +} else if (!expectedPartitionMetadataList.isEmpty() && !actualPartitionMetadataList.isEmpty()) { +for (int j = 0; j < expectedTopicMetadata.numPartitions(); j++) { +ConsumerGroupPartitionMetadataValue.PartitionMetadata expectedPartitionMetadata = +expectedPartitionMetadataList.get(j); +ConsumerGroupPartitionMetadataValue.PartitionMetadata actualPartitionMetadata = +actualPartitionMetadataList.get(j); + +assertEquals(expectedPartitionMetadata.partition(), actualPartitionMetadata.partition()); + assertUnorderedListEquals(expectedPartitionMetadata.racks(), actualPartitionMetadata.racks()); +} +} else { +assertEquals(expected.message(), actual.message()); Review Comment: Sorry that was indented wrong, this is the else for the outermost if and else if where the message type is checked -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on PR #13643: URL: https://github.com/apache/kafka/pull/13643#issuecomment-1654251894 > @cmccabe, I don't follow this comment. When the client calls KafkaRaftClient::schedule{Atomic}Append the KafkaRaftClient compare the provided offset with the nextOffset stored in the BatchAccumulator. If we want this method to succeed in most cases KafkaRaftClient::logEndOffset should return that offset, BatchAccumulator::nextOffset and not the log end offset. There can't be anything in the accumulator when we become active, because we are not adding things to the accumulator when we are inactive. Therefore all we need to know is the end of the log. After becoming active, the active controller tracks its own offset and doesn't need to access the offset in BatchAccumulator or the end of the log offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276701201 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5567,7 +5600,7 @@ public void testJoinGroupAppendErrorConversion() { assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE); } -private void assertUnorderedListEquals( +public static void assertUnorderedListEquals( Review Comment: My bad I had to only make the recordEquals method public to be able to use in the RecordHelpersTest, reverted the others to private -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276693242 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java: ## @@ -40,23 +45,31 @@ public class TopicMetadata { */ private final int numPartitions; +/** + * Map of every partition to a set of its rackIds. + * If the rack information is unavailable, this is an empty map. Review Comment: changed the wording -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
cmccabe commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1276690892 ## raft/src/main/java/org/apache/kafka/raft/RaftClient.java: ## @@ -171,16 +172,21 @@ default void beginShutdown() {} * to resign its leadership. The state machine is expected to discard all * uncommitted entries after observing an epoch change. * + * If the current base offset does not match the supplied required base offset, + * then this method will throw {@link UnexpectedBaseOffsetException}. + * * @param epoch the current leader epoch + * @param requiredBaseOffset if this is set, it is the offset we must use as the base offset. * @param records the list of records to append * @return the expected offset of the last record if append succeed * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum * batch size; if this exception is throw none of the elements in records were * committed * @throws NotLeaderException if we are not the current leader or the epoch doesn't match the leader epoch * @throws BufferAllocationException we failed to allocate memory for the records + * @throws UnexpectedBaseOffsetException the requested base offset could not be obtained. */ -long scheduleAtomicAppend(int epoch, List records); +long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List records); Review Comment: The controller doesn't use `scheduleAppend`. We only use `scheduleAtomicAppend`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276677238 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -440,12 +442,13 @@ public void testUpdateSubscriptionMetadata() { // It should return foo now. assertEquals( mkMap( -mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) +mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, Collections.emptyMap())) Review Comment: Should we update `testUpdateSubscriptionMetadata` to include racks for some topics? Otherwise, it seems that we don't really test the newly added logic into `computeSubscriptionMetadata`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276671418 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -84,14 +85,15 @@ public void addGroupMember( public Uuid addTopicMetadata( String topicName, -int numPartitions +int numPartitions, +Map> partitionRacks ) { Uuid topicId = Uuid.randomUuid(); subscriptionMetadata.put(topicName, new TopicMetadata( topicId, topicName, -numPartitions -)); +numPartitions, +partitionRacks)); Review Comment: nit: Let bring back the closing parenthesis as they were. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276670928 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class SubscribedTopicMetadataTest { + +@Test +public void testAttribute() { +Map topicMetadataMap = new HashMap<>(); +for (int i = 0; i < 5; i++) { +Uuid topicId = Uuid.randomUuid(); +String topicName = "topic" + i; +Map> partitionRacks = mkMapOfPartitionRacks(5); +topicMetadataMap.put( +topicId, +new TopicMetadata(topicId, topicName, 5, partitionRacks) +); +} +assertEquals(topicMetadataMap, new SubscribedTopicMetadata(topicMetadataMap).topicMetadata()); +} + +@Test +public void testTopicMetadataCannotBeNull() { +assertThrows(NullPointerException.class, () -> new SubscribedTopicMetadata(null)); +} + +@Test +public void testEquals() { +Map topicMetadataMap = new HashMap<>(); +for (int i = 0; i < 5; i++) { +Uuid topicId = Uuid.randomUuid(); +String topicName = "topic" + i; +Map> partitionRacks = mkMapOfPartitionRacks(5); +topicMetadataMap.put( +topicId, +new TopicMetadata(topicId, topicName, 5, partitionRacks) +); +} +SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap); +assertEquals(new SubscribedTopicMetadata(topicMetadataMap), subscribedTopicMetadata); + +Map topicMetadataMap2 = new HashMap<>(); +Uuid topicId = Uuid.randomUuid(); +topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5, Collections.emptyMap())); +assertNotEquals(new SubscribedTopicMetadata(topicMetadataMap2), subscribedTopicMetadata); +} +} Review Comment: nit: Let's add a new line at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276670286 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -571,4 +716,14 @@ private void assertAssignment(Map>> expectedAssig assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); } } + +// When rack awareness is enabled for this assignor, rack information can be updated in this method. +private Map> createPartitionMetadata(int numPartitions) { Review Comment: nit: static? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276669652 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -764,4 +813,39 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + +// For the purpose of testing let: +// a) Number of replicas for each partition = 2 +// b) Number of racks available in the cluster = 4 Review Comment: Should we remove this? It does not make sense here. Or was it supposed to be somewhere else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276668936 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -612,15 +616,14 @@ public void testNewGroupMetadataRecordThrowsWhenEmptyAssignment() { MetadataVersion.IBP_3_5_IV2 )); } - Review Comment: It is not really about the empty line bit more about the unnecessary spaces on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r127851 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals( fromTopicPartitions(actualValue.partitionsPendingRevocation())); assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), fromTopicPartitions(actualValue.partitionsPendingAssignment())); -} else { -assertEquals(expected.message(), actual.message()); +} else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { +// The order of the racks stored in PartitionMetadata of ConsumerGroupPartitionMetadataValue is not +// always guaranteed. Therefore, we need a special comparator. +ConsumerGroupPartitionMetadataValue expectedValue = +(ConsumerGroupPartitionMetadataValue) expected.message(); +ConsumerGroupPartitionMetadataValue actualValue = +(ConsumerGroupPartitionMetadataValue) actual.message(); + +List expectedTopicMetadataList = +expectedValue.topics(); +List actualTopicMetadataList = +actualValue.topics(); + +if (expectedTopicMetadataList.size() != actualTopicMetadataList.size()) { +fail("Topic metadata lists have different sizes"); +} + +for (int i = 0; i < expectedValue.topics().size(); i++) { +ConsumerGroupPartitionMetadataValue.TopicMetadata expectedTopicMetadata = +expectedTopicMetadataList.get(i); +ConsumerGroupPartitionMetadataValue.TopicMetadata actualTopicMetadata = +actualTopicMetadataList.get(i); + +assertEquals(expectedTopicMetadata.topicId(), actualTopicMetadata.topicId()); +assertEquals(expectedTopicMetadata.topicName(), actualTopicMetadata.topicName()); +assertEquals(expectedTopicMetadata.numPartitions(), actualTopicMetadata.numPartitions()); + +List expectedPartitionMetadataList = +expectedTopicMetadata.partitionMetadata(); +List actualPartitionMetadataList = +actualTopicMetadata.partitionMetadata(); + +// If the list is empty, rack information wasn't available for any replica of +// the partition and hence, the entry wasn't added to the record. +if (expectedPartitionMetadataList.size() != actualPartitionMetadataList.size()) { +fail("Partition metadata lists have different sizes"); +} else if (!expectedPartitionMetadataList.isEmpty() && !actualPartitionMetadataList.isEmpty()) { +for (int j = 0; j < expectedTopicMetadata.numPartitions(); j++) { +ConsumerGroupPartitionMetadataValue.PartitionMetadata expectedPartitionMetadata = +expectedPartitionMetadataList.get(j); +ConsumerGroupPartitionMetadataValue.PartitionMetadata actualPartitionMetadata = +actualPartitionMetadataList.get(j); + +assertEquals(expectedPartitionMetadata.partition(), actualPartitionMetadata.partition()); + assertUnorderedListEquals(expectedPartitionMetadata.racks(), actualPartitionMetadata.racks()); +} +} else { +assertEquals(expected.message(), actual.message()); Review Comment: Why are we doing this? If the partition metadata are empty, we could just run the previous branch as well. It would be a no-op. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276663083 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals( fromTopicPartitions(actualValue.partitionsPendingRevocation())); assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), fromTopicPartitions(actualValue.partitionsPendingAssignment())); -} else { -assertEquals(expected.message(), actual.message()); +} else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { +// The order of the racks stored in PartitionMetadata of ConsumerGroupPartitionMetadataValue is not +// always guaranteed. Therefore, we need a special comparator. +ConsumerGroupPartitionMetadataValue expectedValue = +(ConsumerGroupPartitionMetadataValue) expected.message(); +ConsumerGroupPartitionMetadataValue actualValue = +(ConsumerGroupPartitionMetadataValue) actual.message(); + +List expectedTopicMetadataList = +expectedValue.topics(); +List actualTopicMetadataList = +actualValue.topics(); + +if (expectedTopicMetadataList.size() != actualTopicMetadataList.size()) { +fail("Topic metadata lists have different sizes"); +} + +for (int i = 0; i < expectedValue.topics().size(); i++) { +ConsumerGroupPartitionMetadataValue.TopicMetadata expectedTopicMetadata = +expectedTopicMetadataList.get(i); +ConsumerGroupPartitionMetadataValue.TopicMetadata actualTopicMetadata = +actualTopicMetadataList.get(i); + +assertEquals(expectedTopicMetadata.topicId(), actualTopicMetadata.topicId()); +assertEquals(expectedTopicMetadata.topicName(), actualTopicMetadata.topicName()); +assertEquals(expectedTopicMetadata.numPartitions(), actualTopicMetadata.numPartitions()); + +List expectedPartitionMetadataList = +expectedTopicMetadata.partitionMetadata(); +List actualPartitionMetadataList = +actualTopicMetadata.partitionMetadata(); + +// If the list is empty, rack information wasn't available for any replica of +// the partition and hence, the entry wasn't added to the record. +if (expectedPartitionMetadataList.size() != actualPartitionMetadataList.size()) { +fail("Partition metadata lists have different sizes"); +} else if (!expectedPartitionMetadataList.isEmpty() && !actualPartitionMetadataList.isEmpty()) { +for (int j = 0; j < expectedTopicMetadata.numPartitions(); j++) { Review Comment: Using numPartitions seems incorrect here as it could be larger than the number of partition metadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276659754 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals( fromTopicPartitions(actualValue.partitionsPendingRevocation())); assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), fromTopicPartitions(actualValue.partitionsPendingAssignment())); -} else { -assertEquals(expected.message(), actual.message()); +} else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { +// The order of the racks stored in PartitionMetadata of ConsumerGroupPartitionMetadataValue is not +// always guaranteed. Therefore, we need a special comparator. +ConsumerGroupPartitionMetadataValue expectedValue = +(ConsumerGroupPartitionMetadataValue) expected.message(); +ConsumerGroupPartitionMetadataValue actualValue = +(ConsumerGroupPartitionMetadataValue) actual.message(); + Review Comment: Should we assert numPartitions as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276659754 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals( fromTopicPartitions(actualValue.partitionsPendingRevocation())); assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()), fromTopicPartitions(actualValue.partitionsPendingAssignment())); -} else { -assertEquals(expected.message(), actual.message()); +} else if (actual.message() instanceof ConsumerGroupPartitionMetadataValue) { +// The order of the racks stored in PartitionMetadata of ConsumerGroupPartitionMetadataValue is not +// always guaranteed. Therefore, we need a special comparator. +ConsumerGroupPartitionMetadataValue expectedValue = +(ConsumerGroupPartitionMetadataValue) expected.message(); +ConsumerGroupPartitionMetadataValue actualValue = +(ConsumerGroupPartitionMetadataValue) actual.message(); + Review Comment: Should we assert numPartitions as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276653838 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5567,7 +5600,7 @@ public void testJoinGroupAppendErrorConversion() { assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE); } -private void assertUnorderedListEquals( +public static void assertUnorderedListEquals( Review Comment: Is there a reason for making it public? I have the same question for the following two methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14748) Relax non-null FK left-join requirement
[ https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748285#comment-17748285 ] Guozhang Wang commented on KAFKA-14748: --- [~aki] Thanks for picking this series! I think we can have a light KIP just to summarize: 1. All the operator behavioral changes among these JIRAs. 2. Why we do not make it an opt-in; and therefore: 3. For users who still want the old behavior, what should they change in their code. > Relax non-null FK left-join requirement > --- > > Key: KAFKA-14748 > URL: https://issues.apache.org/jira/browse/KAFKA-14748 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Florin Akermann >Priority: Major > > Kafka Streams enforces a strict non-null-key policy in the DSL across all > key-dependent operations (like aggregations and joins). > This also applies to FK-joins, in particular to the ForeignKeyExtractor. If > it returns `null`, it's treated as invalid. For left-joins, it might make > sense to still accept a `null`, and add the left-hand record with an empty > right-hand-side to the result. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins
[ https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748284#comment-17748284 ] Guozhang Wang commented on KAFKA-12317: --- [~mjsax] Though it may not introduce any new configs or interfaces I'd still suggest we have a light KIP for the behavioral change, also we would definitely want to add a section in the upgrade guide for this. > Relax non-null key requirement for left/outer KStream joins > --- > > Key: KAFKA-12317 > URL: https://issues.apache.org/jira/browse/KAFKA-12317 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Florin Akermann >Priority: Major > > Currently, for a stream-streams and stream-table/globalTable join > KafkaStreams drops all stream records with a `null`-key (`null`-join-key for > stream-globalTable), because for a `null`-(join)key the join is undefined: > ie, we don't have an attribute the do the table lookup (we consider the > stream-record as malformed). Note, that we define the semantics of > _left/outer_ join as: keep the stream record if no matching join record was > found. > We could relax the definition of _left_ stream-table/globalTable and > _left/outer_ stream-stream join though, and not drop `null`-(join)key stream > records, and call the ValueJoiner with a `null` "other-side" value instead: > if the stream record key (or join-key) is `null`, we could treat is as > "failed lookup" instead of treating the stream record as corrupted. > If we make this change, users that want to keep the current behavior, can add > a `filter()` before the join to drop `null`-(join)key records from the stream > explicitly. > Note that this change also requires to change the behavior if we insert a > repartition topic before the join: currently, we drop `null`-key record > before writing into the repartition topic (as we know they would be dropped > later anyway). We need to relax this behavior for a left stream-table and > left/outer stream-stream join. User need to be aware (ie, we might need to > put this into the docs and JavaDocs), that records with `null`-key would be > partitioned randomly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15240) BrokerToControllerChannelManager cache activeController error cause DefaultAlterPartitionManager send AlterPartition request failed
[ https://issues.apache.org/jira/browse/KAFKA-15240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748281#comment-17748281 ] Guozhang Wang commented on KAFKA-15240: --- [~lushilin] Thanks for reporting this. I think [~cmccabe] [~hachikuji] would have the most context to help investigating. > BrokerToControllerChannelManager cache activeController error cause > DefaultAlterPartitionManager send AlterPartition request failed > --- > > Key: KAFKA-15240 > URL: https://issues.apache.org/jira/browse/KAFKA-15240 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0, 2.8.1, 2.8.2, 3.5.0 > Environment: 2.8.1 kafka version >Reporter: shilin Lu >Assignee: shilin Lu >Priority: Major > Attachments: image-2023-07-24-16-35-56-589.png > > > After KIP-497,partition leader do not use zk to propagateIsrChanges,it will > send AlterPartitionRequest to controller to propagateIsrChanges.Then broker > will cache active controller node info through controllerNodeProvider > interface. > 2023.07.12,in kafka product environment,we find so much `Broker had a stale > broker epoch` when send partitionAlterRequest to controller.And in this kafka > cluster has so much replica not in isr assignment with replica fetch is > correct.So it only propagateIsrChanges failed. > !https://iwiki.woa.com/tencent/api/attachments/s3/url?attachmentid=3165506! > But there has something strange,if broker send partitionAlterRequest failed > controller will print some log like this.But in active controller node not > find this log info > !image-2023-07-24-16-35-56-589.png! > Then i just suspect this broker connect to an error active controller.Through > network packet capture, find this broker connect to an unfamiliar broker > port(9092) send request.Refer to this kafka cluster operation history,find > this unfamiliar broker is an old broker node in this cluster and this node is > a controller node in new kafka cluster. > Current BrokerToControllerChannelManager update active controller only > happened when disconnect or responseCode is NOT_CONTROLLER. So when no > request send and error broker node is another kafka cluster controller > node,this case will repetite. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ahuang98 commented on a diff in pull request #14084: [MINOR] Add latest versions to kraft upgrade kafkatest
ahuang98 commented on code in PR #14084: URL: https://github.com/apache/kafka/pull/14084#discussion_r1276628397 ## tests/kafkatest/version.py: ## @@ -252,3 +252,7 @@ def get_version(node=None): V_3_5_0 = KafkaVersion("3.5.0") V_3_5_1 = KafkaVersion("3.5.1") LATEST_3_5 = V_3_5_1 + +# 3.6.x versions +V_3_6_0 = KafkaVersion("3.6.0") +LATEST_3_6 = V_3_6_0 Review Comment: I had noticed that 3.6 had been removed when I was working on this change and just thought I'd add it back in since it seems the convention is to add the new version whenever we cut the release branch for the prior version. Other than some OCD to follow the existing pattern, I'm indifferent to keeping it or leaving it out since as you said, it's not in use currently -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event
mumrah commented on code in PR #14115: URL: https://github.com/apache/kafka/pull/14115#discussion_r1276619004 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -297,6 +297,16 @@ private boolean isValidStateChange(MigrationDriverState newState) { } } +private boolean checkDriverState(MigrationDriverState expectedState) { +if (migrationState.equals(expectedState)) { +return true; +} else { +log.debug("Expected driver state {} but found {}. Not running this event {}.", Review Comment: It shouldn't happen often, no. INFO sounds good to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event
cmccabe commented on code in PR #14115: URL: https://github.com/apache/kafka/pull/14115#discussion_r1276615552 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -297,6 +297,16 @@ private boolean isValidStateChange(MigrationDriverState newState) { } } +private boolean checkDriverState(MigrationDriverState expectedState) { +if (migrationState.equals(expectedState)) { +return true; +} else { +log.debug("Expected driver state {} but found {}. Not running this event {}.", Review Comment: Should this be `info`? It won't happen often, will it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event
mumrah commented on code in PR #14115: URL: https://github.com/apache/kafka/pull/14115#discussion_r1276614670 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -599,6 +604,9 @@ public void run() throws Exception { class MigrateMetadataEvent extends MigrationEvent { @Override public void run() throws Exception { +if (!checkDriverState(MigrationDriverState.ZK_MIGRATION)) { +return; +} Review Comment: This is the actual fix. The rest of the changes are for cleanliness/readability -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276612481 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ## @@ -612,15 +616,14 @@ public void testNewGroupMetadataRecordThrowsWhenEmptyAssignment() { MetadataVersion.IBP_3_5_IV2 )); } - Review Comment: just a line after the test, I think it can stay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #14084: [MINOR] Add latest versions to kraft upgrade kafkatest
rondagostino commented on code in PR #14084: URL: https://github.com/apache/kafka/pull/14084#discussion_r1276607523 ## tests/kafkatest/version.py: ## @@ -252,3 +252,7 @@ def get_version(node=None): V_3_5_0 = KafkaVersion("3.5.0") V_3_5_1 = KafkaVersion("3.5.1") LATEST_3_5 = V_3_5_1 + +# 3.6.x versions +V_3_6_0 = KafkaVersion("3.6.0") +LATEST_3_6 = V_3_6_0 Review Comment: Seems odd to add this now since, as pointed out in https://github.com/apache/kafka/pull/13654#discussion_r1270835742, it won't be used. Seems this makes most sense to add when we bump `__version__ = '3.7.0.dev0'` in `tests/kafkatest/__init__.py`. To be clear (and to reiterate what is mentioned in that linked comment), it doesn't hurt to add this. But I'm wondering if I'm missing something since you decided to add it back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event
mumrah opened a new pull request, #14115: URL: https://github.com/apache/kafka/pull/14115 To avoid processing events once we have transitioned to a new state, add a check at the beginning of the events created by PollEvent in KRaftMigrationDriver. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs
kamalcph commented on PR #14114: URL: https://github.com/apache/kafka/pull/14114#issuecomment-1654008152 @satishd @divijvaidya @showuon PTAL when you get chance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15263) KRaftMigrationDriver can run the migration twice
David Arthur created KAFKA-15263: Summary: KRaftMigrationDriver can run the migration twice Key: KAFKA-15263 URL: https://issues.apache.org/jira/browse/KAFKA-15263 Project: Kafka Issue Type: Bug Reporter: David Arthur Assignee: David Arthur There is a narrow race condition in KRaftMigrationDriver where a PollEvent can run that sees the internal state as ZK_MIGRATION and is immediately followed by another poll event (due to external call to {{{}wakeup(){}}}) that results in two MigrateMetadataEvent being enqueued. Since MigrateMetadataEvent lacks a check on the internal state, this causes the metadata migration to occur twice. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276573086 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain + * topic and partition metadata for the topics that the consumer group is subscribed to. + */ +public class SubscribedTopicMetadata implements SubscribedTopicDescriber { Review Comment: done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph opened a new pull request, #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs
kamalcph opened a new pull request, #14114: URL: https://github.com/apache/kafka/pull/14114 **Topic -> Broker Synonym** - local.retention.bytes -> log.local.retention.bytes - local.retention.ms -> log.local.retention.ms We cannot add synonym for `remote.storage.enable` topic level config as it depends on [KIP-950](https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement) *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan merged pull request #14088: MINOR: Adjust Invalid Record Exception for Invalid Txn State as mentioned in KIP-890
jolshan merged PR #14088: URL: https://github.com/apache/kafka/pull/14088 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #14049: KAFKA-14038: Optimise calculation of size for log in remote tier
clolov commented on code in PR #14049: URL: https://github.com/apache/kafka/pull/14049#discussion_r1276543815 ## storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java: ## @@ -74,6 +74,11 @@ public void onPartitionLeadershipChanges(Set leaderPartitions, public void onStopPartitions(Set partitions) { } +@Override +public long remoteLogSize(TopicIdPartition topicPartition, int leaderEpoch) { Review Comment: Great catch! Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
philipnee commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1653959801 Hmm - you can try. But I've had similar, if not the same, failures and I haven't had a chance to examine the cause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abhijeetk88 opened a new pull request, #14113: KAFKA-15260: RLM Task should wait for RLMM to initialize
abhijeetk88 opened a new pull request, #14113: URL: https://github.com/apache/kafka/pull/14113 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented
[ https://issues.apache.org/jira/browse/KAFKA-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ravindranath Kakarla updated KAFKA-15262: - Description: As per the MM2 [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. However, this is not happening right now. To the contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? *Reference:* _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._ [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] was: As per the MM2 [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. h2. _Internal Topics_ _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._ However, this is not happening right now. To contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] > MirrorHeartbeatConnector is not working as documented > - > > Key: KAFKA-15262 > URL: https://issues.apache.org/jira/browse/KAFKA-15262 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.8.0, 3.4.0, 3.5.0 >Reporter: Ravindranath Kakarla >Priority: Major > > As per the MM2 > [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] > the MirrorHeartbeatConnector should emit pings to heartbeat topic on the > source cluster which then gets replicated to the target cluster. This can be > used to demonstrate that MM2 is replicating the data. > However, this is not happening right now. To the contrary, the > MirrorHeartbeatConnector is producing heartbeat pings to target cluster > instead of source. This is not much useful as it won't help detect problems > connecting to source cluster or with the data replication. > Is my understanding of the MirrorHeartbeatConnector accurate? > *Reference:* > _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated > to demonstrate connectivity through the connectors. Downstream consumers can > use this topic to verify that a) the connector is running and b) the > corresponding source cluster is available. Heartbeats will get propagated by > source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat > are possible._ > [Code > Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1276487843 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -624,8 +745,27 @@ public void testDeleteMember() { 20 ); -Uuid fooTopicId = context.addTopicMetadata("foo", 6); -Uuid barTopicId = context.addTopicMetadata("bar", 6); +Uuid fooTopicId = context.addTopicMetadata("foo", 6, +mkMap( +mkEntry(0, Collections.emptySet()), +mkEntry(1, Collections.emptySet()), +mkEntry(2, Collections.emptySet()), +mkEntry(3, Collections.emptySet()), +mkEntry(4, Collections.emptySet()), +mkEntry(5, Collections.emptySet()) +) +); + +Uuid barTopicId = context.addTopicMetadata("bar", 6, Review Comment: Added racks to one of the tests `testPartialAssignmentUpdate`. and removed for the rest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1275444245 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -624,8 +745,27 @@ public void testDeleteMember() { 20 ); -Uuid fooTopicId = context.addTopicMetadata("foo", 6); -Uuid barTopicId = context.addTopicMetadata("bar", 6); +Uuid fooTopicId = context.addTopicMetadata("foo", 6, +mkMap( +mkEntry(0, Collections.emptySet()), +mkEntry(1, Collections.emptySet()), +mkEntry(2, Collections.emptySet()), +mkEntry(3, Collections.emptySet()), +mkEntry(4, Collections.emptySet()), +mkEntry(5, Collections.emptySet()) +) +); + +Uuid barTopicId = context.addTopicMetadata("bar", 6, Review Comment: https://github.com/apache/kafka/pull/14099/files/70a6214809fba59511dab58a1da21f74be4edbae#r1275362470 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented
[ https://issues.apache.org/jira/browse/KAFKA-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ravindranath Kakarla updated KAFKA-15262: - Description: As per the MM2 [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. h2. _Internal Topics_ _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._ However, this is not happening right now. To contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] was: As per the MM2 [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. h2. _Internal Topics_ _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._ However, this is not happening right now. To contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] > MirrorHeartbeatConnector is not working as documented > - > > Key: KAFKA-15262 > URL: https://issues.apache.org/jira/browse/KAFKA-15262 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.8.0, 3.4.0, 3.5.0 >Reporter: Ravindranath Kakarla >Priority: Major > > As per the MM2 > [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] > the MirrorHeartbeatConnector should emit pings to heartbeat topic on the > source cluster which then gets replicated to the target cluster. This can be > used to demonstrate that MM2 is replicating the data. > h2. _Internal Topics_ > _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated > to demonstrate connectivity through the connectors. Downstream consumers can > use this topic to verify that a) the connector is running and b) the > corresponding source cluster is available. Heartbeats will get propagated by > source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat > are possible._ > However, this is not happening right now. To contrary, the > MirrorHeartbeatConnector is producing heartbeat pings to target cluster > instead of source. This is not much useful as it won't help detect problems > connecting to source cluster or with the data replication. > > Is my understanding of the MirrorHeartbeatConnector accurate? > [Code > Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented
[ https://issues.apache.org/jira/browse/KAFKA-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ravindranath Kakarla updated KAFKA-15262: - Description: As per the MM2 [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. h2. _Internal Topics_ _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._ However, this is not happening right now. To contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] was: As per the MM2 [KIP-382 | [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. h2. _Internal Topics_ _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._ However, this is not happening right now. To contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] > MirrorHeartbeatConnector is not working as documented > - > > Key: KAFKA-15262 > URL: https://issues.apache.org/jira/browse/KAFKA-15262 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.8.0, 3.4.0, 3.5.0 >Reporter: Ravindranath Kakarla >Priority: Major > > As per the MM2 > [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] > the MirrorHeartbeatConnector should emit pings to heartbeat topic on the > source cluster which then gets replicated to the target cluster. This can be > used to demonstrate that MM2 is replicating the data. > h2. _Internal Topics_ > _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated > to demonstrate connectivity through the connectors. Downstream consumers can > use this topic to verify that a) the connector is running and b) the > corresponding source cluster is available. Heartbeats will get propagated by > source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat > are possible._ > > > However, this is not happening right now. To contrary, the > MirrorHeartbeatConnector is producing heartbeat pings to target cluster > instead of source. This is not much useful as it won't help detect problems > connecting to source cluster or with the data replication. > > Is my understanding of the MirrorHeartbeatConnector accurate? > [Code > Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented
[ https://issues.apache.org/jira/browse/KAFKA-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ravindranath Kakarla updated KAFKA-15262: - Description: As per the MM2 [KIP-382 | [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. h2. _Internal Topics_ _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._ However, this is not happening right now. To contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] was: As per the MM2 [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. h2. _Internal Topics_ _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._ However, this is not happening right now. To contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] > MirrorHeartbeatConnector is not working as documented > - > > Key: KAFKA-15262 > URL: https://issues.apache.org/jira/browse/KAFKA-15262 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.8.0, 3.4.0, 3.5.0 >Reporter: Ravindranath Kakarla >Priority: Major > > As per the MM2 [KIP-382 | > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] > the MirrorHeartbeatConnector should emit pings to heartbeat topic on the > source cluster which then gets replicated to the target cluster. This can be > used to demonstrate that MM2 is replicating the data. > h2. _Internal Topics_ > _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated > to demonstrate connectivity through the connectors. Downstream consumers can > use this topic to verify that a) the connector is running and b) the > corresponding source cluster is available. Heartbeats will get propagated by > source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat > are possible._ > > > However, this is not happening right now. To contrary, the > MirrorHeartbeatConnector is producing heartbeat pings to target cluster > instead of source. This is not much useful as it won't help detect problems > connecting to source cluster or with the data replication. > > Is my understanding of the MirrorHeartbeatConnector accurate? > [Code > Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented
[ https://issues.apache.org/jira/browse/KAFKA-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ravindranath Kakarla updated KAFKA-15262: - Description: As per the MM2 [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. h2. _Internal Topics_ _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._ However, this is not happening right now. To contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] was: As per the MM2 [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0],] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. """ h2. Internal Topics MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible. """ However, this is not happening right now. To contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] > MirrorHeartbeatConnector is not working as documented > - > > Key: KAFKA-15262 > URL: https://issues.apache.org/jira/browse/KAFKA-15262 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.8.0, 3.4.0, 3.5.0 >Reporter: Ravindranath Kakarla >Priority: Major > > As per the MM2 > [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] > the MirrorHeartbeatConnector should emit pings to heartbeat topic on the > source cluster which then gets replicated to the target cluster. This can be > used to demonstrate that MM2 is replicating the data. > h2. _Internal Topics_ > _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated > to demonstrate connectivity through the connectors. Downstream consumers can > use this topic to verify that a) the connector is running and b) the > corresponding source cluster is available. Heartbeats will get propagated by > source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat > are possible._ > > > However, this is not happening right now. To contrary, the > MirrorHeartbeatConnector is producing heartbeat pings to target cluster > instead of source. This is not much useful as it won't help detect problems > connecting to source cluster or with the data replication. > > Is my understanding of the MirrorHeartbeatConnector accurate? > [Code > Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented
Ravindranath Kakarla created KAFKA-15262: Summary: MirrorHeartbeatConnector is not working as documented Key: KAFKA-15262 URL: https://issues.apache.org/jira/browse/KAFKA-15262 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.5.0, 3.4.0, 2.8.0 Reporter: Ravindranath Kakarla As per the MM2 [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0],] the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source cluster which then gets replicated to the target cluster. This can be used to demonstrate that MM2 is replicating the data. """ h2. Internal Topics MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to demonstrate connectivity through the connectors. Downstream consumers can use this topic to verify that a) the connector is running and b) the corresponding source cluster is available. Heartbeats will get propagated by source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat are possible. """ However, this is not happening right now. To contrary, the MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead of source. This is not much useful as it won't help detect problems connecting to source cluster or with the data replication. Is my understanding of the MirrorHeartbeatConnector accurate? [Code Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748216#comment-17748216 ] Max Riedel edited comment on KAFKA-14509 at 7/27/23 3:40 PM: - Hey [~dajac], I started implementing the request/response schemas and classes and have several questions: # I changed the ConsumerGroupDescribeResponse.json schema compared to what is stated [here|#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI].] I think the changes are necessary to make it compile but I'm not entirely sure if I understood everything correctly. # How do I set errorCode in ConsumerGroupDescribeRequest correctly? Other Request objects have a field errorCode, while in this case we have a list of DescribedGroup which can have individual errorCodes. Without setting this correctly I can't make testSerialization test in RequestResponseTest.java work. # This also makes errorCounts in ConsumerGroupDescribeResponse more complicated and I'm not sure if I did the right thing. You can have a look into my first draft [here|[https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]]. Please let us discuss my questions soon :) was (Author: JIRAUSER300902): Hey [~dajac], I started implementing the request/response schemas and classes and have several questions: # I changed the ConsumerGroupDescribeResponse.json schema compared to what is stated [here|#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI].] I think the changes are necessary to make it compile but I'm not entirely sure if I understood everything correctly. # How do I set errorCode in ConsumerGroupDescribeRequest correctly? Other Request objects have a field errorCode, while in this case we have a list of DescribedGroup which can have individual errorCodes. Without setting this correctly I can't make testSerialization test in RequestResponseTest.java work. # This also makes errorCounts in ConsumerGroupDescribeResponse more complicated and I'm not sure if I did the right thing. You can have a look into my first draft [here|[https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]]. Please let us discuss my questions soon :) > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > > The goal of this task is to implement the ConsumerGroupDescribe API as > described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI]; > and to implement the related changes in the admin client as described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups]. > On the server side, this mainly requires the following steps: > # The request/response schemas must be defined (see > ListGroupsRequest/Response.json for an example); > # Request/response classes must be defined (see > ListGroupsRequest/Response.java for an example); > # The API must be defined in KafkaApis (see > KafkaApis#handleDescribeGroupsRequest for an example); > # The GroupCoordinator interface (java file) must be extended for the new > operations. > # The new operation must be implemented in GroupCoordinatorService (new > coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in > Scala) should just reject the request. > We could probably do 1) and 2) in one pull request and the remaining ones in > another. > On the admin client side, this mainly requires the followings steps: > * Define all the new java classes as defined in the KIP. > * Add the new API to KafkaAdminClient class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748216#comment-17748216 ] Max Riedel edited comment on KAFKA-14509 at 7/27/23 3:39 PM: - Hey [~dajac], I started implementing the request/response schemas and classes and have several questions: # I changed the ConsumerGroupDescribeResponse.json schema compared to what is stated [here|#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI].] I think the changes are necessary to make it compile but I'm not entirely sure if I understood everything correctly. # How do I set errorCode in ConsumerGroupDescribeRequest correctly? Other Request objects have a field errorCode, while in this case we have a list of DescribedGroup which can have individual errorCodes. Without setting this correctly I can't make testSerialization test in RequestResponseTest.java work. # This also makes errorCounts in ConsumerGroupDescribeResponse more complicated and I'm not sure if I did the right thing. You can have a look into my first draft [here|[https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]]. Please let us discuss my questions soon :) was (Author: JIRAUSER300902): Hey [~dajac], I started implementing the request/response schemas and classes and have several questions: # I changed the ConsumerGroupDescribeResponse.json schema compared to what is stated [here|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI].] I think the changes are necessary to make it compile but I'm not entirely sure if I understood everything correctly. # How do I set errorCode in ConsumerGroupDescribeRequest correctly? Other Request objects have a field errorCode, while in this case we have a list of DescribedGroup which can have individual errorCodes. Without setting this correctly I can't make testSerialization test in RequestResponseTest.java work. # This also makes errorCounts in ConsumerGroupDescribeResponse more complicated and I'm not sure if I did the right thing. You can have a look into my first draft [here|[https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]|https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]. Please let us discuss my questions soon :) > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > > The goal of this task is to implement the ConsumerGroupDescribe API as > described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI]; > and to implement the related changes in the admin client as described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups]. > On the server side, this mainly requires the following steps: > # The request/response schemas must be defined (see > ListGroupsRequest/Response.json for an example); > # Request/response classes must be defined (see > ListGroupsRequest/Response.java for an example); > # The API must be defined in KafkaApis (see > KafkaApis#handleDescribeGroupsRequest for an example); > # The GroupCoordinator interface (java file) must be extended for the new > operations. > # The new operation must be implemented in GroupCoordinatorService (new > coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in > Scala) should just reject the request. > We could probably do 1) and 2) in one pull request and the remaining ones in > another. > On the admin client side, this mainly requires the followings steps: > * Define all the new java classes as defined in the KIP. > * Add the new API to KafkaAdminClient class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748216#comment-17748216 ] Max Riedel commented on KAFKA-14509: Hey [~dajac], I started implementing the request/response schemas and classes and have several questions: # I changed the ConsumerGroupDescribeResponse.json schema compared to what is stated [here|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI].] I think the changes are necessary to make it compile but I'm not entirely sure if I understood everything correctly. # How do I set errorCode in ConsumerGroupDescribeRequest correctly? Other Request objects have a field errorCode, while in this case we have a list of DescribedGroup which can have individual errorCodes. Without setting this correctly I can't make testSerialization test in RequestResponseTest.java work. # This also makes errorCounts in ConsumerGroupDescribeResponse more complicated and I'm not sure if I did the right thing. You can have a look into my first draft [here|[https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]|https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]. Please let us discuss my questions soon :) > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > > The goal of this task is to implement the ConsumerGroupDescribe API as > described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI]; > and to implement the related changes in the admin client as described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups]. > On the server side, this mainly requires the following steps: > # The request/response schemas must be defined (see > ListGroupsRequest/Response.json for an example); > # Request/response classes must be defined (see > ListGroupsRequest/Response.java for an example); > # The API must be defined in KafkaApis (see > KafkaApis#handleDescribeGroupsRequest for an example); > # The GroupCoordinator interface (java file) must be extended for the new > operations. > # The new operation must be implemented in GroupCoordinatorService (new > coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in > Scala) should just reject the request. > We could probably do 1) and 2) in one pull request and the remaining ones in > another. > On the admin client side, this mainly requires the followings steps: > * Define all the new java classes as defined in the KIP. > * Add the new API to KafkaAdminClient class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on pull request #13671: KAFKA-14967: fix NPE in MockAdminClient CreateTopicsResult
mimaison commented on PR #13671: URL: https://github.com/apache/kafka/pull/13671#issuecomment-1653859939 Thanks @hertzsprung for the PR! No code in Kafka relies on the content of CreateTopicsResult, and I just realized MockAdminClient is not part of the public API so this is effectively "dead code". That said, I see that we've tried in the past to fix consistency issues between that and the real Admin implementation so I think we can still merge this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abhijeetk88 opened a new pull request, #14112: KAFKA-15261: Do not block replica fetcher if RLMM is not initialized
abhijeetk88 opened a new pull request, #14112: URL: https://github.com/apache/kafka/pull/14112 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ex172000 commented on pull request #14110: MINOR: Add test for describe topic with ID
ex172000 commented on PR #14110: URL: https://github.com/apache/kafka/pull/14110#issuecomment-1653814175 The test failed, but not related. All failed tests are integration test and this is a unit test only change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay
jsancio commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1276417011 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() { return log.latestSnapshotId(); } +@Override +public long logEndOffset() { +return log.endOffset().offset; +} Review Comment: > Hmm... the method name is "logEndOffset." So it should just return the log end offset, right? Returning something else would be misleading. @cmccabe, I don't follow this comment. When the client calls `KafkaRaftClient::schedule{Atomic}Append` the `KafkaRaftClient` compare the provided offset with the `nextOffset` stored in the `BatchAccumulator`. If we want this method to succeed in most cases `KafkaRaftClient::logEndOffset` should return that offset, `BatchAccumulator::nextOffset` and not the log end offset. Maybe `logEndOffset` is not a great name. I am okay renaming this to `KafkaRaftClient::endOffset()` but I am open to suggestions. ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -355,7 +355,8 @@ public void testAppendFailedWithRecordBatchTooLargeException() throws Exception for (int i = 0; i < size; i++) batchToLarge.add("a"); -assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch, batchToLarge)); +assertThrows(RecordBatchTooLargeException.class, +() -> context.client.scheduleAtomicAppend(epoch, OptionalLong.empty(), batchToLarge)); Review Comment: I couldn't find a test for the new `KafkaRaftClient::scheduleAtomicAppend`. ## raft/src/main/java/org/apache/kafka/raft/RaftClient.java: ## @@ -171,16 +172,21 @@ default void beginShutdown() {} * to resign its leadership. The state machine is expected to discard all * uncommitted entries after observing an epoch change. * + * If the current base offset does not match the supplied required base offset, + * then this method will throw {@link UnexpectedBaseOffsetException}. + * * @param epoch the current leader epoch + * @param requiredBaseOffset if this is set, it is the offset we must use as the base offset. * @param records the list of records to append * @return the expected offset of the last record if append succeed * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum * batch size; if this exception is throw none of the elements in records were * committed * @throws NotLeaderException if we are not the current leader or the epoch doesn't match the leader epoch * @throws BufferAllocationException we failed to allocate memory for the records + * @throws UnexpectedBaseOffsetException the requested base offset could not be obtained. */ -long scheduleAtomicAppend(int epoch, List records); +long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List records); Review Comment: What's the argument/reason for adding this functionality to `scheduleAtomicAppend` and not `scheduleAppend`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org