[GitHub] [kafka] lihaosky commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


lihaosky commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277119690


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcessConsumer = racksForProcessConsumer;
 this.internalTopicManager = internalTopicManager;
 this.assignmentConfigs = assignmentConfigs;
 this.racksForPartition = new HashMap<>();
 this.racksForProcess = new HashMap<>();
+validateClientRack(racksForProcessConsumer);

Review Comment:
   Yeah. Reverted and introduced `validClientRack` to track it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd merged pull request #14049: KAFKA-14038: Optimise calculation of size for log in remote tier

2023-07-27 Thread via GitHub


satishd merged PR #14049:
URL: https://github.com/apache/kafka/pull/14049


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #14049: KAFKA-14038: Optimise calculation of size for log in remote tier

2023-07-27 Thread via GitHub


satishd commented on PR #14049:
URL: https://github.com/apache/kafka/pull/14049#issuecomment-1655051965

   A few unrelated test failures in Jenkins jobs, merging it to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15231) Add ability to pause/resume Remote Log Manager tasks

2023-07-27 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748406#comment-17748406
 ] 

Satish Duggana edited comment on KAFKA-15231 at 7/28/23 4:48 AM:
-

There can be scenarios where you want to control more on remote read/write 
operations. One way to control respective read and write throughputs is to set 
respective quotas. This will be addressed with 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas 
which is in Draft state.


was (Author: satish.duggana):
There can be scenarios where you want to control more on remote read/write 
operations. One way to control respective read and write throughputs is to set 
respective quotas. This will be addressed with 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas

> Add ability to pause/resume Remote Log Manager tasks 
> -
>
> Key: KAFKA-15231
> URL: https://issues.apache.org/jira/browse/KAFKA-15231
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: tiered-storage
>
> Once Tiered Storage is enabled, there may be situations where needed to pause 
> uploading tasks to a remote-tier. e.g. remote storage maintenance, 
> troubleshooting, etc.
> An RSM implementation may not be able to do this by itself without throwing 
> exceptions, polluting the logs, etc.
> Could we consider adding this ability to the Tiered Storage framework? Remote 
> Log Manager seems like a good candidate place for this; though I'm wondering 
> on how to expose it.
> Would be interested to hear if this sounds like a good idea, and what options 
> we have to include these.
> We have been considering extending RLM tasks with a pause flag, and having an 
> MBean to switch them on demand. Another option may be to extend the Kafka 
> protocol to expose this – but seems much moved involved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15231) Add ability to pause/resume Remote Log Manager tasks

2023-07-27 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748406#comment-17748406
 ] 

Satish Duggana commented on KAFKA-15231:


There can be scenarios where you want to control more on remote read/write 
operations. One way to control respective read and write throughputs is to set 
respective quotas. This will be addressed with 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas

> Add ability to pause/resume Remote Log Manager tasks 
> -
>
> Key: KAFKA-15231
> URL: https://issues.apache.org/jira/browse/KAFKA-15231
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: tiered-storage
>
> Once Tiered Storage is enabled, there may be situations where needed to pause 
> uploading tasks to a remote-tier. e.g. remote storage maintenance, 
> troubleshooting, etc.
> An RSM implementation may not be able to do this by itself without throwing 
> exceptions, polluting the logs, etc.
> Could we consider adding this ability to the Tiered Storage framework? Remote 
> Log Manager seems like a good candidate place for this; though I'm wondering 
> on how to expose it.
> Would be interested to hear if this sounds like a good idea, and what options 
> we have to include these.
> We have been considering extending RLM tasks with a pause flag, and having an 
> MBean to switch them on demand. Another option may be to extend the Kafka 
> protocol to expose this – but seems much moved involved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.

2023-07-27 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15265:
---
Description: Related KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas

> Remote copy/fetch quotas for tiered storage.
> 
>
> Key: KAFKA-15265
> URL: https://issues.apache.org/jira/browse/KAFKA-15265
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: Abhijeet Kumar
>Priority: Major
>
> Related KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.

2023-07-27 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-15265:
--

 Summary: Remote copy/fetch quotas for tiered storage.
 Key: KAFKA-15265
 URL: https://issues.apache.org/jira/browse/KAFKA-15265
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Satish Duggana
Assignee: Abhijeet Kumar






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15214) Add metrics for OffsetOutOfRangeException when tiered storage is enabled

2023-07-27 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748402#comment-17748402
 ] 

Satish Duggana commented on KAFKA-15214:


[~yaolixin]  Is the client trying to fetch the remote data and encountering 
OffsetOutOfRangeException? What offset client was trying to fetch?

We should not add OffsetOutOfRangeException errors as RemoteReadErrorsPerSec, 
because  OffsetOutOfRangeException is a valid error and it may not be related 
to any errors while fetching the remote reads. 

We can add more logs if needed for better clarity on the errors encountered.

> Add metrics for OffsetOutOfRangeException when tiered storage is enabled
> 
>
> Key: KAFKA-15214
> URL: https://issues.apache.org/jira/browse/KAFKA-15214
> Project: Kafka
>  Issue Type: Sub-task
>  Components: metrics
>Affects Versions: 3.6.0
>Reporter: Lixin Yao
>Priority: Minor
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> In the current metrics RemoteReadErrorsPerSec, the exception type 
> OffsetOutOfRangeException is not included.
> In our testing with tiered storage feature (at Apple), we noticed several 
> cases where remote download is affected and stuck due to repeatedly 
> OffsetOutOfRangeException in some particular broker or topic partitions. The 
> root cause could be various but currently without a metrics it's very hard to 
> catch this issue and debug in a timely fashion. It's understandable that the 
> exception itself could not be the root cause but this exception metric could 
> be a good metrics for us to alert and investigate.
> Related discussion
> [https://github.com/apache/kafka/pull/13944#discussion_r1266243006]
> I am happy to contribute to this if the request is agreed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] github-actions[bot] commented on pull request #13568: KAFKA-14906:Extract the coordinator service log from server log

2023-07-27 Thread via GitHub


github-actions[bot] commented on PR #13568:
URL: https://github.com/apache/kafka/pull/13568#issuecomment-1654936322

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
 );
 }
 
+@Test
+public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+final Map racksForProcess = mkMap(
+mkEntry(UUID_1, "rack1"),
+mkEntry(UUID_2, "rack2"),
+mkEntry(UUID_3, "rack3"),
+mkEntry(UUID_4, "rack1"),
+mkEntry(UUID_5, "rack2"),
+mkEntry(UUID_6, "rack3"),
+mkEntry(UUID_7, "rack1"),
+mkEntry(UUID_8, "rack2"),
+mkEntry(UUID_9, "rack3")
+);
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), 
TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), 
TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), 
TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+);
+
+final Map clientStatesWithTags = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3,
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3
+);
+
+final Set allActiveTasks = findAllActiveTasks(clientStates);
+
+standbyTaskAssignor.assign(clientStates, allActiveTasks, 
allActiveTasks, assignmentConfigs);
+
+final AssignmentConfigs assignmentConfigsWithTags = 
newAssignmentConfigs(2, ZONE_TAG);
+standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
+standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, 
allActiveTasks, assignmentConfigsWithTags);
+
+Stream.of(clientStates, clientStatesWithTags).forEach(
+cs -> {
+
assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity));
+Stream.of(UUID_1, UUID_2, UUID_3)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 0));
+Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 2));
+assertTotalNumberOfStandbyTasksEqualsTo(cs, 12);
+
+assertTrue(
+standbyClientsHonorRackAwareness(
+TASK_0_0,
+cs,
+singletonList(
+mkSet(UUID_6, UUID_8)

Review Comment:
   Thanks. So my understanding was actually correct.
   
   Follow up question: should we keep this test tight as it is right now, or 
should we actually say, we only verify that the algorithm does not pick 
anything in zone_1 (ie, not `UUID_1, 4, 7`), and thus, we pass in all _valid_ 
UUID form all different zones instead, ie, we pass in `UUID_2, 5, 6, 8, 9` ?
   
  

[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
 );
 }
 
+@Test
+public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+final Map racksForProcess = mkMap(
+mkEntry(UUID_1, "rack1"),
+mkEntry(UUID_2, "rack2"),
+mkEntry(UUID_3, "rack3"),
+mkEntry(UUID_4, "rack1"),
+mkEntry(UUID_5, "rack2"),
+mkEntry(UUID_6, "rack3"),
+mkEntry(UUID_7, "rack1"),
+mkEntry(UUID_8, "rack2"),
+mkEntry(UUID_9, "rack3")
+);
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), 
TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), 
TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), 
TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+);
+
+final Map clientStatesWithTags = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3,
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3
+);
+
+final Set allActiveTasks = findAllActiveTasks(clientStates);
+
+standbyTaskAssignor.assign(clientStates, allActiveTasks, 
allActiveTasks, assignmentConfigs);
+
+final AssignmentConfigs assignmentConfigsWithTags = 
newAssignmentConfigs(2, ZONE_TAG);
+standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
+standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, 
allActiveTasks, assignmentConfigsWithTags);
+
+Stream.of(clientStates, clientStatesWithTags).forEach(
+cs -> {
+
assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity));
+Stream.of(UUID_1, UUID_2, UUID_3)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 0));
+Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 2));
+assertTotalNumberOfStandbyTasksEqualsTo(cs, 12);
+
+assertTrue(
+standbyClientsHonorRackAwareness(
+TASK_0_0,
+cs,
+singletonList(
+mkSet(UUID_6, UUID_8)

Review Comment:
   Thanks. So my understanding was actually correct.
   
   Follow up question: should we keep this test tight as it is right now, or 
should we actually say, we only verify that the algorithm does not pick 
anything in zone_1 (ie, not `UUID_1, 4, 7`), and thus, we pass in all _valid_ 
UUID form all different zones instead, ie, we pass in `UUID_2, 5, 6, 8, 9` ?



-- 

[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277018411


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java:
##
@@ -313,20 +315,145 @@ public void 
shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
 );
 }
 
+@Test
+public void shouldDistributeStandbyTasksUsingFunctionAndSupplierTags() {
+final RackAwareTaskAssignor rackAwareTaskAssignor = 
mock(RackAwareTaskAssignor.class);
+
when(rackAwareTaskAssignor.canEnableRackAwareAssignor()).thenReturn(true);
+final Map racksForProcess = mkMap(
+mkEntry(UUID_1, "rack1"),
+mkEntry(UUID_2, "rack2"),
+mkEntry(UUID_3, "rack3"),
+mkEntry(UUID_4, "rack1"),
+mkEntry(UUID_5, "rack2"),
+mkEntry(UUID_6, "rack3"),
+mkEntry(UUID_7, "rack1"),
+mkEntry(UUID_8, "rack2"),
+mkEntry(UUID_9, "rack3")
+);
+
when(rackAwareTaskAssignor.racksForProcess()).thenReturn(racksForProcess);
+final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2);
+standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignmentConfigs, rackAwareTaskAssignor);
+verify(rackAwareTaskAssignor, times(1)).racksForProcess();
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, mkMap(), 
TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, mkMap(), 
TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, mkMap(), 
TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, mkMap())),
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, mkMap())),
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, mkMap())),
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, mkMap())),
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, mkMap())),
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, mkMap()))
+);
+
+final Map clientStatesWithTags = mkMap(
+mkEntry(UUID_1, createClientStateWithCapacity(UUID_1, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0, TASK_1_0)),
+mkEntry(UUID_2, createClientStateWithCapacity(UUID_2, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2)), TASK_0_1, TASK_1_1)),
+mkEntry(UUID_3, createClientStateWithCapacity(UUID_3, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3)), TASK_0_2, TASK_1_2)),
+
+mkEntry(UUID_4, createClientStateWithCapacity(UUID_4, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_5, createClientStateWithCapacity(UUID_5, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_6, createClientStateWithCapacity(UUID_6, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3,
+
+mkEntry(UUID_7, createClientStateWithCapacity(UUID_7, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_1,
+mkEntry(UUID_8, createClientStateWithCapacity(UUID_8, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_2,
+mkEntry(UUID_9, createClientStateWithCapacity(UUID_9, 2, 
mkMap(mkEntry(ZONE_TAG, ZONE_3
+);
+
+final Set allActiveTasks = findAllActiveTasks(clientStates);
+
+standbyTaskAssignor.assign(clientStates, allActiveTasks, 
allActiveTasks, assignmentConfigs);
+
+final AssignmentConfigs assignmentConfigsWithTags = 
newAssignmentConfigs(2, ZONE_TAG);
+standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
+standbyTaskAssignor.assign(clientStatesWithTags, allActiveTasks, 
allActiveTasks, assignmentConfigsWithTags);
+
+Stream.of(clientStates, clientStatesWithTags).forEach(
+cs -> {
+
assertTrue(cs.values().stream().allMatch(ClientState::reachedCapacity));
+Stream.of(UUID_1, UUID_2, UUID_3)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 0));
+Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9)
+.forEach(client -> 
assertStandbyTaskCountForClientEqualsTo(cs, client, 2));
+assertTotalNumberOfStandbyTasksEqualsTo(cs, 12);
+
+assertTrue(
+standbyClientsHonorRackAwareness(
+TASK_0_0,
+cs,
+singletonList(
+mkSet(UUID_6, UUID_8)

Review Comment:
   > The algorithm eventually picked 6 and 8. 
   
   Ah. So my understanding was actually correct.
   
   Follow up question: should we make this test tight as it is right now, or 
should we actually say, we only verify that the algorithm does not pick 
anything in zone_1, and thus, we pass in all _valid_ UUID form different zones 
instead, ie, we pass in `UUID_2, 

[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcessConsumer = racksForProcessConsumer;
 this.internalTopicManager = internalTopicManager;
 this.assignmentConfigs = assignmentConfigs;
 this.racksForPartition = new HashMap<>();
 this.racksForProcess = new HashMap<>();
+validateClientRack(racksForProcessConsumer);

Review Comment:
   I think we should document that we might throw here, because when we create 
` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill 
the `StreamThread`?
   
   Now that I am realizing this, I am actually wondering if it's a good idea to 
handle it this way? In the end, for the callee it might be better to figure out 
if the `RackAwareTaskAssignor` can we used or not via method call before we 
create it? Maybe we nee some `static` method into which we pass 
`racksForProcessConsumer`? -- Any other idea to make it more natural for the 
callee?
   
   If we think handling the exception is fine for the callee, also ok with me. 
Just asking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcessConsumer = racksForProcessConsumer;
 this.internalTopicManager = internalTopicManager;
 this.assignmentConfigs = assignmentConfigs;
 this.racksForPartition = new HashMap<>();
 this.racksForProcess = new HashMap<>();
+validateClientRack(racksForProcessConsumer);

Review Comment:
   I think we should document that we might throw here, because when we create 
` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill 
the `StreamThread`?
   
   Now that I am realizing this, I am actually wondering if it's a good idea to 
handle it this way? In the end, for the callee it might be better to figure out 
if the `RackAwareTaskAssignor` can we used or not via method call before we 
create it? Maybe we nee some `static` method into which we pass 
`racksForProcessConsumer`? -- Any other idea to make it more natural for the 
callee?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277015051


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcessConsumer = racksForProcessConsumer;
 this.internalTopicManager = internalTopicManager;
 this.assignmentConfigs = assignmentConfigs;
 this.racksForPartition = new HashMap<>();
 this.racksForProcess = new HashMap<>();
+validateClientRack(racksForProcessConsumer);

Review Comment:
   I think we should document that we might throw here, because when we create 
` RackAwareTaskAssignor` we need to catch this exception to ensure to not kill 
the `StreamThread`?
   
   Now that I am realizing this, I am actually wondering if it's a good idea to 
handle it this way? In the end, for the callee it might be better to figure out 
if the `RackAwareTaskAssignor` can we used or not via method call before we 
create it? Maybe we nee some `static` method? -- Any other idea to make it more 
natural for the callee?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14097: KAFKA-15022: [4/N] use client tag assignor for rack aware standby task assignment

2023-07-27 Thread via GitHub


mjsax commented on code in PR #14097:
URL: https://github.com/apache/kafka/pull/14097#discussion_r1277014726


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -63,11 +62,11 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
  final AssignmentConfigs assignmentConfigs) {
 this.fullMetadata = fullMetadata;
 this.partitionsForTask = partitionsForTask;
-this.racksForProcessConsumer = racksForProcessConsumer;

Review Comment:
   We can also remove `racksForProcessConsumer` from the parameter list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter

2023-07-27 Thread jianbin.chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jianbin.chen updated KAFKA-15264:
-
 Attachment: image-2023-07-28-09-52-38-941.png
Description: 
I was preparing to upgrade from 1.1.0 to 3.5.1's kraft mode (new cluster 
deployment), and when I recently compared and tested, I found that when using 
the following stress test command, the throughput gap is obvious

 
{code:java}
./kafka-producer-perf-test.sh --topic test321 --num-records 3000 
--record-size 1024 --throughput -1 --producer-props bootstrap.servers=xxx: 
acks=1
419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg latency, 
588.0 ms max latency.
555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg 
latency, 460.0 ms max latency.
552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg 
latency, 1120.0 ms max latency.
552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg 
latency, 1097.0 ms max latency.
538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg 
latency, 610.0 ms max latency.
511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg latency, 
1892.0 ms max latency.
511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg latency, 
3000.0 ms max latency.
519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg 
latency, 1781.0 ms max latency.
513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg 
latency, 2590.0 ms max latency.
463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg latency, 
1463.0 ms max latency.
494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg latency, 
2362.0 ms max latency.
506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg latency, 
2986.0 ms max latency.
393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg latency, 
2958.0 ms max latency.
426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg latency, 
1959.0 ms max latency.
412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg latency, 
1995.0 ms max latency.
370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg latency, 
1496.0 ms max latency.
391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg latency, 
2446.0 ms max latency.
355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg latency, 
2715.0 ms max latency.
385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg latency, 
2702.0 ms max latency.
381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg latency, 
1846.0 ms max latency.
67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg latency, 
1414.0 ms max latency.
376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg latency, 
1897.0 ms max latency.
354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg latency, 
1601.0 ms max latency.
353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg latency, 
1563.0 ms max latency.
321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg latency, 
1975.0 ms max latency.
404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg latency, 
1753.0 ms max latency.
384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg latency, 
1833.0 ms max latency.
387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg latency, 
1927.0 ms max latency.
343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg latency, 
1685.0 ms max latency.
00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg latency, 
2146.0 ms max latency.
361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg latency, 
2125.0 ms max latency.
357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg latency, 
1502.0 ms max latency.
340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg latency, 
1932.0 ms max latency.
390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg latency, 
1807.0 ms max latency.
352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg latency, 
1892.0 ms max latency.
354526 records sent, 70905.2 records/sec (69.24 MB/sec), 429.6 ms avg latency, 
2128.0 ms max latency.
356670 records sent, 71305.5 records/sec (69.63 MB/sec), 408.9 ms avg latency, 
1329.0 ms max latency.
309204 records sent, 60687.7 records/sec (59.27 MB/sec), 438.6 ms avg latency, 
2566.0 ms max latency.
366715 records sent, 72316.1 records/sec (70.62 MB/sec), 474.5 ms avg latency, 
2169.0 ms max latency.
375174 records sent, 75034.8 records/sec (73.28 MB/sec), 429.9 ms avg latency, 
1722.0 ms max latency.
359400 records sent, 70346.4 records/sec (68.70 MB/sec), 432.1 ms avg latency, 
1961.0 ms max latency.
312276 records sent, 62430.2 records/sec (60.97 MB/sec), 477.4 ms avg latency, 
2006.0 ms max latency.

[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-07-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748367#comment-17748367
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

Thanks for getting back – I was just asking a general question :) 

The DQL feature seems to be independent though, and I agree that it could be 
useful to add to Kafka Streams.

About the handler: looking into the stacktrace, it seem that the issue is 
actually happening during commit, in particular when offset are written:
sendOffsetsToTransaction(KafkaProducer.java:757)
This is a totally different code path. The `ProductionExceptionHandler` is 
covering the `producer.send()` code path only. – Looking into the code of both 
3.1 and 3.2, the behavior should be the same: for the call to 
`sendOffsetsToTransactions` the handler won't be triggered.

And for this case, we also cannot trigger the handler, because there is nothing 
to be dropped on the floor – Kafka Streams tries to write offsets to commit a 
TX and we cannot skip writing offsets.
{quote}Our additional testing revealed that "continue" worked in Kafka Streams 
3.1.2, but no longer works since Kafka Streams 3.2.0, as rollback occurs.
{quote}
Did you test this for `send()` or the commit case? For the `send()` case it 
should work for both versions; for the commit-case it should not work for 
either version (and is something that cannot be fixed).

Curious to hear about your findings.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> 

[jira] [Updated] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter

2023-07-27 Thread jianbin.chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jianbin.chen updated KAFKA-15264:
-
 Attachment: image-2023-07-28-09-51-01-662.png
Description: 
I was preparing to upgrade from 1.1.0 to 3.5.1's kraft mode (new cluster 
deployment), and when I recently compared and tested, I found that when using 
the following stress test command, the throughput gap is obvious

 
{code:java}
./kafka-producer-perf-test.sh --topic test321 --num-records 3000 
--record-size 1024 --throughput -1 --producer-props bootstrap.servers=xxx: 
acks=1
419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg latency, 
588.0 ms max latency.
555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg 
latency, 460.0 ms max latency.
552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg 
latency, 1120.0 ms max latency.
552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg 
latency, 1097.0 ms max latency.
538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg 
latency, 610.0 ms max latency.
511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg latency, 
1892.0 ms max latency.
511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg latency, 
3000.0 ms max latency.
519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg 
latency, 1781.0 ms max latency.
513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg 
latency, 2590.0 ms max latency.
463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg latency, 
1463.0 ms max latency.
494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg latency, 
2362.0 ms max latency.
506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg latency, 
2986.0 ms max latency.
393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg latency, 
2958.0 ms max latency.
426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg latency, 
1959.0 ms max latency.
412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg latency, 
1995.0 ms max latency.
370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg latency, 
1496.0 ms max latency.
391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg latency, 
2446.0 ms max latency.
355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg latency, 
2715.0 ms max latency.
385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg latency, 
2702.0 ms max latency.
381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg latency, 
1846.0 ms max latency.
67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg latency, 
1414.0 ms max latency.
376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg latency, 
1897.0 ms max latency.
354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg latency, 
1601.0 ms max latency.
353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg latency, 
1563.0 ms max latency.
321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg latency, 
1975.0 ms max latency.
404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg latency, 
1753.0 ms max latency.
384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg latency, 
1833.0 ms max latency.
387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg latency, 
1927.0 ms max latency.
343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg latency, 
1685.0 ms max latency.
00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg latency, 
2146.0 ms max latency.
361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg latency, 
2125.0 ms max latency.
357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg latency, 
1502.0 ms max latency.
340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg latency, 
1932.0 ms max latency.
390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg latency, 
1807.0 ms max latency.
352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg latency, 
1892.0 ms max latency.
354526 records sent, 70905.2 records/sec (69.24 MB/sec), 429.6 ms avg latency, 
2128.0 ms max latency.
356670 records sent, 71305.5 records/sec (69.63 MB/sec), 408.9 ms avg latency, 
1329.0 ms max latency.
309204 records sent, 60687.7 records/sec (59.27 MB/sec), 438.6 ms avg latency, 
2566.0 ms max latency.
366715 records sent, 72316.1 records/sec (70.62 MB/sec), 474.5 ms avg latency, 
2169.0 ms max latency.
375174 records sent, 75034.8 records/sec (73.28 MB/sec), 429.9 ms avg latency, 
1722.0 ms max latency.
359400 records sent, 70346.4 records/sec (68.70 MB/sec), 432.1 ms avg latency, 
1961.0 ms max latency.
312276 records sent, 62430.2 records/sec (60.97 MB/sec), 477.4 ms avg latency, 
2006.0 ms max latency.

[GitHub] [kafka] jeffkbkim commented on pull request #14117: MINOR: Code cleanups in group-coordinator module

2023-07-27 Thread via GitHub


jeffkbkim commented on PR #14117:
URL: https://github.com/apache/kafka/pull/14117#issuecomment-1654851021

   can you point me to where the log context includes the topic partition info?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter

2023-07-27 Thread jianbin.chen (Jira)
jianbin.chen created KAFKA-15264:


 Summary: Compared with 1.1.0zk, the peak throughput of 3.5.1kraft 
is very jitter
 Key: KAFKA-15264
 URL: https://issues.apache.org/jira/browse/KAFKA-15264
 Project: Kafka
  Issue Type: Bug
Reporter: jianbin.chen


I was preparing to upgrade from 1.1.0 to 3.5.1's kraft mode (new cluster 
deployment), and when I recently compared and tested, I found that when using 
the following stress test command, the throughput gap is obvious

 
{code:java}
./kafka-producer-perf-test.sh --topic test321 --num-records 3000 
--record-size 1024 --throughput -1 --producer-props bootstrap.servers=xxx: 
acks=1
419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg latency, 
588.0 ms max latency.
555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg 
latency, 460.0 ms max latency.
552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg 
latency, 1120.0 ms max latency.
552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg 
latency, 1097.0 ms max latency.
538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg 
latency, 610.0 ms max latency.
511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg latency, 
1892.0 ms max latency.
511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg latency, 
3000.0 ms max latency.
519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg 
latency, 1781.0 ms max latency.
513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg 
latency, 2590.0 ms max latency.
463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg latency, 
1463.0 ms max latency.
494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg latency, 
2362.0 ms max latency.
506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg latency, 
2986.0 ms max latency.
393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg latency, 
2958.0 ms max latency.
426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg latency, 
1959.0 ms max latency.
412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg latency, 
1995.0 ms max latency.
370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg latency, 
1496.0 ms max latency.
391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg latency, 
2446.0 ms max latency.
355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg latency, 
2715.0 ms max latency.
385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg latency, 
2702.0 ms max latency.
381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg latency, 
1846.0 ms max latency.
67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg latency, 
1414.0 ms max latency.
376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg latency, 
1897.0 ms max latency.
354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg latency, 
1601.0 ms max latency.
353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg latency, 
1563.0 ms max latency.
321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg latency, 
1975.0 ms max latency.
404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg latency, 
1753.0 ms max latency.
384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg latency, 
1833.0 ms max latency.
387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg latency, 
1927.0 ms max latency.
343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg latency, 
1685.0 ms max latency.
00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg latency, 
2146.0 ms max latency.
361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg latency, 
2125.0 ms max latency.
357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg latency, 
1502.0 ms max latency.
340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg latency, 
1932.0 ms max latency.
390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg latency, 
1807.0 ms max latency.
352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg latency, 
1892.0 ms max latency.
354526 records sent, 70905.2 records/sec (69.24 MB/sec), 429.6 ms avg latency, 
2128.0 ms max latency.
356670 records sent, 71305.5 records/sec (69.63 MB/sec), 408.9 ms avg latency, 
1329.0 ms max latency.
309204 records sent, 60687.7 records/sec (59.27 MB/sec), 438.6 ms avg latency, 
2566.0 ms max latency.
366715 records sent, 72316.1 records/sec (70.62 MB/sec), 474.5 ms avg latency, 
2169.0 ms max latency.
375174 records sent, 75034.8 records/sec (73.28 MB/sec), 429.9 ms avg latency, 
1722.0 ms max latency.
359400 records sent, 70346.4 records/sec (68.70 MB/sec), 432.1 ms avg latency, 
1961.0 ms max latency.
312276 

[GitHub] [kafka] philipnee commented on pull request #14086: MINOR: Test assign() and assignment() in the integration test

2023-07-27 Thread via GitHub


philipnee commented on PR #14086:
URL: https://github.com/apache/kafka/pull/14086#issuecomment-1654826392

   Hey @junrao Thanks a lot for reviewing this. I fixed the broken build and 
here are the failing tests.  I believe they are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / testReplication() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest
   3m 11s
   Build / JDK 17 and Scala 2.13 / 
testDescribeAtMinIsrPartitions(String).quorum=kraft – 
kafka.admin.TopicCommandIntegrationTest
   19s
   Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   2m 3s
   Build / JDK 8 and Scala 2.12 / [3] idBlockLen=10 – 
kafka.coordinator.transaction.ProducerIdManagerTest
   <1s
   Build / JDK 8 and Scala 2.12 / testSnapshotsGenerated() – 
kafka.server.RaftClusterSnapshotTest
   26s
   Build / JDK 8 and Scala 2.12 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   12s
   Build / JDK 8 and Scala 2.12 / 
shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once] – 
org.apache.kafka.streams.integration.EosIntegrationTest
   12s
   Build / JDK 11 and Scala 2.13 / 
testDescribeAtMinIsrPartitions(String).quorum=kraft – 
kafka.admin.TopicCommandIntegrationTest
   18s
   Build / JDK 11 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s
   Build / JDK 20 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   2m 0s
   Build / JDK 20 and Scala 2.13 / testRackAwareRangeAssignor() – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   40s
   Build / JDK 20 and Scala 2.13 / testRackAwareRangeAssignor() – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   42s
   Build / JDK 20 and Scala 2.13 / testBalancePartitionLeaders() – 
org.apache.kafka.controller.QuorumControllerTest
   12s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276971308


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276959521


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] cmccabe merged pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-27 Thread via GitHub


cmccabe merged PR #13643:
URL: https://github.com/apache/kafka/pull/13643


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276944101


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276943880


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276942521


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276939557


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276936889


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276923811


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276922896


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(0);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276921862


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+context,
+"group-id",
+"leader-instance-id",
+"follower-instance-id"
+);
+
+SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+.withGroupId("group-id")
+.withGroupInstanceId("leader-instance-id")
+.withMemberId(rebalanceResult.leaderId)
+.withGenerationId(rebalanceResult.generationId)
+.build();
+
+CompletableFuture syncFuture = new 
CompletableFuture<>();
+CoordinatorResult result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+assertTrue(result.records().isEmpty());
+assertTrue(syncFuture.isDone());
+assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId(rebalanceResult.leaderId)
+.setGenerationId(rebalanceResult.generationId);
+
+HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+heartbeatRequest.setGroupInstanceId("leader-instance-id")
+.setMemberId("invalid-member-id"));
+
+assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatUnknownGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatDeadGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+group.transitionTo(DEAD);
+
+HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+.setGroupId("group-id")
+.setMemberId("member-id")
+.setGenerationId(-1);
+
+HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+}
+
+@Test
+public void testHeartbeatEmptyGroup() {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+GenericGroup group = context.createGenericGroup("group-id");
+
+JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+protocols.add(new JoinGroupRequestProtocol()
+.setName("range")
+.setMetadata(new byte[]{0}));
+
+group.add(new GenericGroupMember(
+"member-id",
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols
+));
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(EMPTY);

Review Comment:
   thanks. removed the transitions, the group starts off as empty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276919519


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");
+
+RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(

Review Comment:
   resolved via rebase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276919382


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
 this.followerAssignment = followerAssignment;
 }
 }
+
+@Test
+public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.build();
+context.createGenericGroup("group-id");

Review Comment:
   Thanks for the catch. Removed from all tests that call 
staticMembersJoinAndRebalance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator

2023-07-27 Thread via GitHub


jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276874868


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -368,9 +369,29 @@ public CompletableFuture heartbeat(
 return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
 }
 
-return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-"This API is not implemented yet."
-));
+if (!isGroupIdNotEmpty(request.groupId())) {
+return CompletableFuture.completedFuture(new 
HeartbeatResponseData()
+.setErrorCode(Errors.INVALID_GROUP_ID.code()));
+}
+
+return runtime.scheduleReadOperation("generic-group-heartbeat",
+topicPartitionFor(request.groupId()),
+(coordinator, __) -> coordinator.genericGroupHeartbeat(context, 
request)

Review Comment:
   as discussed offline, we will keep it as is since it is how the current 
coordinator behaves.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #14118: KAFKA-14875: Implement wakeup

2023-07-27 Thread via GitHub


philipnee opened a new pull request, #14118:
URL: https://github.com/apache/kafka/pull/14118

   Continuation of https://github.com/apache/kafka/pull/13490.  I closed the 
original one due to rebase difficulties.
   
   **Summary
   Implemented wakeup() mechanism using a WakeupTrigger class to store the 
pending wakeup item, and when wakeup() is invoked, it checks whether there's an 
active task or a wakeup task.  
   - If there's an active task: the task will be completed exceptionally and 
the atomic reference will be freed up.
   - If there an wakedup task, which means wakeup() was invoked before a 
blocking call was issued.  Therefore, the current task will be completed 
exceptionally immediately.
   
   This PR also addressed minor issues such is:
   1. Throwing WakeupException at the right place: As wakeups are thrown by 
completing an active future exceptionally.  The WakeupException is wrapped 
inside of the ExecutionException.
   2. mockConstruction is a thread-lock mock; therefore, we need to free up the 
reference before completing the test. Otherwise, other tests will continue 
using the thread-lock mock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee closed pull request #13490: KAFKA-14875: Implement wakeup

2023-07-27 Thread via GitHub


philipnee closed pull request #13490: KAFKA-14875: Implement wakeup
URL: https://github.com/apache/kafka/pull/13490


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


rreddy-22 commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276830477


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -764,4 +813,39 @@ public void testNewOffsetCommitTombstoneRecord() {
 Record record = 
RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1);
 assertEquals(expectedRecord, record);
 }
+
+// For the purpose of testing let:
+//  a) Number of replicas for each partition = 2
+//  b) Number of racks available in the cluster = 4

Review Comment:
   Okay I will add it to the javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


rreddy-22 commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276830285


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5567,7 +5600,7 @@ public void testJoinGroupAppendErrorConversion() {
 assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE);
 }
 
-private  void assertUnorderedListEquals(
+public static  void assertUnorderedListEquals(

Review Comment:
   Yeah that makes sense, I'll have to move all the corresponding methods also 
to that class 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #14084: [MINOR] Add latest versions to kraft upgrade kafkatest

2023-07-27 Thread via GitHub


rondagostino commented on code in PR #14084:
URL: https://github.com/apache/kafka/pull/14084#discussion_r1276771939


##
tests/kafkatest/version.py:
##
@@ -252,3 +252,7 @@ def get_version(node=None):
 V_3_5_0 = KafkaVersion("3.5.0")
 V_3_5_1 = KafkaVersion("3.5.1")
 LATEST_3_5 = V_3_5_1
+
+# 3.6.x versions
+V_3_6_0 = KafkaVersion("3.6.0")
+LATEST_3_6 = V_3_6_0

Review Comment:
   Ok, we'll just leave it here and add it back in now. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-07-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-14976:
--

Assignee: Almog Gavra

> Left/outer stream-stream joins create KV stores that aren't customizable
> 
>
> Key: KAFKA-14976
> URL: https://issues.apache.org/jira/browse/KAFKA-14976
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Almog Gavra
>Priority: Major
>  Labels: needs-kip
>
> It appears that we only give the illusion of full customizability when it 
> comes to the state stores of a windowed join. This arose due to an 
> [optimization|https://github.com/apache/kafka/pull/11252] for the performance 
> of the spurious results fix, and means that these joins now come with one 
> additional, and possibly unexpected, state store:
>  
> {code:java}
> final StoreBuilder, 
> LeftOrRightValue>> builder =
>             new ListValueStoreBuilder<>(
>          |--[   persistent ? 
> this-->  |         Stores.persistentKeyValueStore(storeName) : 
>          |--[      Stores.inMemoryKeyValueStore(storeName),
>                 timestampedKeyAndJoinSideSerde,
>                 leftOrRightValueSerde,
>                 Time.SYSTEM
>             ); {code}
>  
> where persistent is defined above that as
> {code:java}
> final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null 
> || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
>  
> This means regardless of whether a custom state store implementation was 
> passed in to the join, we will still insert one of our RocksDB or InMemory 
> state stores. Which might be very surprising since the API makes it seem like 
> the underlying stores are fully configurable.
> I'm adding a warning line for this in PR 
> [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
>  but we should really make this hidden state store fully configurable like 
> the window stores currently are (which will require a KIP)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-07-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14976:
---
Labels: kip  (was: needs-kip)

> Left/outer stream-stream joins create KV stores that aren't customizable
> 
>
> Key: KAFKA-14976
> URL: https://issues.apache.org/jira/browse/KAFKA-14976
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Almog Gavra
>Priority: Major
>  Labels: kip
>
> It appears that we only give the illusion of full customizability when it 
> comes to the state stores of a windowed join. This arose due to an 
> [optimization|https://github.com/apache/kafka/pull/11252] for the performance 
> of the spurious results fix, and means that these joins now come with one 
> additional, and possibly unexpected, state store:
>  
> {code:java}
> final StoreBuilder, 
> LeftOrRightValue>> builder =
>             new ListValueStoreBuilder<>(
>          |--[   persistent ? 
> this-->  |         Stores.persistentKeyValueStore(storeName) : 
>          |--[      Stores.inMemoryKeyValueStore(storeName),
>                 timestampedKeyAndJoinSideSerde,
>                 leftOrRightValueSerde,
>                 Time.SYSTEM
>             ); {code}
>  
> where persistent is defined above that as
> {code:java}
> final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null 
> || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
>  
> This means regardless of whether a custom state store implementation was 
> passed in to the join, we will still insert one of our RocksDB or InMemory 
> state stores. Which might be very surprising since the API makes it seem like 
> the underlying stores are fully configurable.
> I'm adding a warning line for this in PR 
> [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
>  but we should really make this hidden state store fully configurable like 
> the window stores currently are (which will require a KIP)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable

2023-07-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748314#comment-17748314
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14976:


Will be addressed by 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types

> Left/outer stream-stream joins create KV stores that aren't customizable
> 
>
> Key: KAFKA-14976
> URL: https://issues.apache.org/jira/browse/KAFKA-14976
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> It appears that we only give the illusion of full customizability when it 
> comes to the state stores of a windowed join. This arose due to an 
> [optimization|https://github.com/apache/kafka/pull/11252] for the performance 
> of the spurious results fix, and means that these joins now come with one 
> additional, and possibly unexpected, state store:
>  
> {code:java}
> final StoreBuilder, 
> LeftOrRightValue>> builder =
>             new ListValueStoreBuilder<>(
>          |--[   persistent ? 
> this-->  |         Stores.persistentKeyValueStore(storeName) : 
>          |--[      Stores.inMemoryKeyValueStore(storeName),
>                 timestampedKeyAndJoinSideSerde,
>                 leftOrRightValueSerde,
>                 Time.SYSTEM
>             ); {code}
>  
> where persistent is defined above that as
> {code:java}
> final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null 
> || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code}
>  
> This means regardless of whether a custom state store implementation was 
> passed in to the join, we will still insert one of our RocksDB or InMemory 
> state stores. Which might be very surprising since the API makes it seem like 
> the underlying stores are fully configurable.
> I'm adding a warning line for this in PR 
> [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336]
>  but we should really make this hidden state store fully configurable like 
> the window stores currently are (which will require a KIP)
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ableegoldman commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-27 Thread via GitHub


ableegoldman commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1654436865

   @flashmouse you can decrease the consumer count. For one thing, this covers 
the edge case where the consumers do not all have the same topic subscription, 
which seems to be quite rare. For another, 2,000 consumers is fairly large to 
begin with. And especially large for a consumer group where the consumers are 
all given different topic subscriptions.
   
   Honestly it's probably more important to test with a higher partition count 
than a large group of consumers like this. Are you able to get it passing 
within 90s with 500 consumers and 5,000 partitions? That feels like a more 
realistic large-scale test case. If not, fix it with just 500 consumers and 
maybe run the test with 4,000, then 3,000, then 2,000 partitions, etc, until 
you can get it to pass.
   
   If it's still not passing with 500 consumers and 2000 partitions, that's a 
bit troubling. But we can cross that bridge if it comes up


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276742165


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5567,7 +5600,7 @@ public void testJoinGroupAppendErrorConversion() {
 assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE);
 }
 
-private  void assertUnorderedListEquals(
+public static  void assertUnorderedListEquals(

Review Comment:
   I see. I wonder if we should actually move `recordEquals` to 
`RecordHelpersTest`. The dependency would make more sense this way, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276739854


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -764,4 +813,39 @@ public void testNewOffsetCommitTombstoneRecord() {
 Record record = 
RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1);
 assertEquals(expectedRecord, record);
 }
+
+// For the purpose of testing let:
+//  a) Number of replicas for each partition = 2
+//  b) Number of racks available in the cluster = 4

Review Comment:
   The javadoc is definitely a better place than this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #14117: MINOR: Code cleanups in group-coordinator module

2023-07-27 Thread via GitHub


dajac opened a new pull request, #14117:
URL: https://github.com/apache/kafka/pull/14117

   This patch does a few code cleanups in the group-coordinator module.
   - It renames `Coordinator` to `CoordinatorShard`;
   - It renames `ReplicatedGroupCoordinator` to `GroupCoordinatorShard`. I was 
never really happy with this name. The new name makes more sense to me;
   - It removes `TopicPartition` from the `GroupMetadataManager`. It was only 
used in log messages. The log context already includes it so we don't have to 
log it again.
   - It renames `assignors` to `consumerGroupAssignors`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph opened a new pull request, #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-07-27 Thread via GitHub


kamalcph opened a new pull request, #14116:
URL: https://github.com/apache/kafka/pull/14116

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


rreddy-22 commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276710063


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -764,4 +813,39 @@ public void testNewOffsetCommitTombstoneRecord() {
 Record record = 
RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1);
 assertEquals(expectedRecord, record);
 }
+
+// For the purpose of testing let:
+//  a) Number of replicas for each partition = 2
+//  b) Number of racks available in the cluster = 4

Review Comment:
   Its applicable to both the mkMapOfPartitionRacks and mkListOfPartitionRacks 
methods to show why there's two rackIds in each set and why the mod is 4. I got 
confused when I saw the old tests and didn't understand why a certain 
calculation was made and what the assumptions behind it were. I initially added 
it to the javadoc for the first method but realised it probably doesn't belong 
there. I'm not sure if I should place it inside the method for both or where 
the correct placement would be



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-27 Thread via GitHub


cmccabe commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1654254344

   > I couldn't find a test for the new KafkaRaftClient::scheduleAtomicAppend.
   
   I added `KafkaRaftClientTest.testAppendWithRequiredBaseOffset`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


rreddy-22 commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276706144


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals(
 
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
 
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
 
fromTopicPartitions(actualValue.partitionsPendingAssignment()));
-} else {
-assertEquals(expected.message(), actual.message());
+} else if (actual.message() instanceof 
ConsumerGroupPartitionMetadataValue) {
+// The order of the racks stored in PartitionMetadata of 
ConsumerGroupPartitionMetadataValue is not
+// always guaranteed. Therefore, we need a special comparator.
+ConsumerGroupPartitionMetadataValue expectedValue =
+(ConsumerGroupPartitionMetadataValue) expected.message();
+ConsumerGroupPartitionMetadataValue actualValue =
+(ConsumerGroupPartitionMetadataValue) actual.message();
+
+List 
expectedTopicMetadataList =
+expectedValue.topics();
+List 
actualTopicMetadataList =
+actualValue.topics();
+
+if (expectedTopicMetadataList.size() != 
actualTopicMetadataList.size()) {
+fail("Topic metadata lists have different sizes");
+}
+
+for (int i = 0; i < expectedValue.topics().size(); i++) {
+ConsumerGroupPartitionMetadataValue.TopicMetadata 
expectedTopicMetadata =
+expectedTopicMetadataList.get(i);
+ConsumerGroupPartitionMetadataValue.TopicMetadata 
actualTopicMetadata =
+actualTopicMetadataList.get(i);
+
+assertEquals(expectedTopicMetadata.topicId(), 
actualTopicMetadata.topicId());
+assertEquals(expectedTopicMetadata.topicName(), 
actualTopicMetadata.topicName());
+assertEquals(expectedTopicMetadata.numPartitions(), 
actualTopicMetadata.numPartitions());
+
+List 
expectedPartitionMetadataList =
+expectedTopicMetadata.partitionMetadata();
+List 
actualPartitionMetadataList =
+actualTopicMetadata.partitionMetadata();
+
+// If the list is empty, rack information wasn't available for 
any replica of
+// the partition and hence, the entry wasn't added to the 
record.
+if (expectedPartitionMetadataList.size() != 
actualPartitionMetadataList.size()) {
+fail("Partition metadata lists have different sizes");
+} else if (!expectedPartitionMetadataList.isEmpty() && 
!actualPartitionMetadataList.isEmpty()) {
+for (int j = 0; j < expectedTopicMetadata.numPartitions(); 
j++) {
+ConsumerGroupPartitionMetadataValue.PartitionMetadata 
expectedPartitionMetadata =
+expectedPartitionMetadataList.get(j);
+ConsumerGroupPartitionMetadataValue.PartitionMetadata 
actualPartitionMetadata =
+actualPartitionMetadataList.get(j);
+
+assertEquals(expectedPartitionMetadata.partition(), 
actualPartitionMetadata.partition());
+
assertUnorderedListEquals(expectedPartitionMetadata.racks(), 
actualPartitionMetadata.racks());
+}
+} else {
+assertEquals(expected.message(), actual.message());

Review Comment:
   Sorry that was indented wrong, this is the else for the outermost if and 
else if where the message type is checked



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-27 Thread via GitHub


cmccabe commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1654251894

   > @cmccabe, I don't follow this comment. When the client calls 
KafkaRaftClient::schedule{Atomic}Append the KafkaRaftClient compare the 
provided offset with the nextOffset stored in the BatchAccumulator. If we want 
this method to succeed in most cases KafkaRaftClient::logEndOffset should 
return that offset, BatchAccumulator::nextOffset and not the log end offset.
   
   There can't be anything in the accumulator when we become active, because we 
are not adding things to the accumulator when we are inactive. Therefore all we 
need to know is the end of the log.
   
   After becoming active, the active controller tracks its own offset and 
doesn't need to access the offset in BatchAccumulator or the end of the log 
offset.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


rreddy-22 commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276701201


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5567,7 +5600,7 @@ public void testJoinGroupAppendErrorConversion() {
 assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE);
 }
 
-private  void assertUnorderedListEquals(
+public static  void assertUnorderedListEquals(

Review Comment:
   My bad I had to only make the recordEquals method public to be able to use 
in the RecordHelpersTest, reverted the others to private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


rreddy-22 commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276693242


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java:
##
@@ -40,23 +45,31 @@ public class TopicMetadata {
  */
 private final int numPartitions;
 
+/**
+ * Map of every partition to a set of its rackIds.
+ * If the rack information is unavailable, this is an empty map.

Review Comment:
   changed the wording



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-27 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1276690892


##
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##
@@ -171,16 +172,21 @@ default void beginShutdown() {}
  * to resign its leadership. The state machine is expected to discard all
  * uncommitted entries after observing an epoch change.
  *
+ * If the current base offset does not match the supplied required base 
offset,
+ * then this method will throw {@link UnexpectedBaseOffsetException}.
+ *
  * @param epoch the current leader epoch
+ * @param requiredBaseOffset if this is set, it is the offset we must use 
as the base offset.
  * @param records the list of records to append
  * @return the expected offset of the last record if append succeed
  * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if 
the size of the records is greater than the maximum
  * batch size; if this exception is throw none of the elements in 
records were
  * committed
  * @throws NotLeaderException if we are not the current leader or the 
epoch doesn't match the leader epoch
  * @throws BufferAllocationException we failed to allocate memory for the 
records
+ * @throws UnexpectedBaseOffsetException the requested base offset could 
not be obtained.
  */
-long scheduleAtomicAppend(int epoch, List records);
+long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, 
List records);

Review Comment:
   The controller doesn't use `scheduleAppend`. We only use 
`scheduleAtomicAppend`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276677238


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -440,12 +442,13 @@ public void testUpdateSubscriptionMetadata() {
 // It should return foo now.
 assertEquals(
 mkMap(
-mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
+mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
Collections.emptyMap()))

Review Comment:
   Should we update `testUpdateSubscriptionMetadata` to include racks for some 
topics? Otherwise, it seems that we don't really test the newly added logic 
into `computeSubscriptionMetadata`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276671418


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -84,14 +85,15 @@ public void addGroupMember(
 
 public Uuid addTopicMetadata(
 String topicName,
-int numPartitions
+int numPartitions,
+Map> partitionRacks
 ) {
 Uuid topicId = Uuid.randomUuid();
 subscriptionMetadata.put(topicName, new TopicMetadata(
 topicId,
 topicName,
-numPartitions
-));
+numPartitions,
+partitionRacks));

Review Comment:
   nit: Let bring back the closing parenthesis as they were.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276670928


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadataTest.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class SubscribedTopicMetadataTest {
+
+@Test
+public void testAttribute() {
+Map topicMetadataMap = new HashMap<>();
+for (int i = 0; i < 5; i++) {
+Uuid topicId = Uuid.randomUuid();
+String topicName = "topic" + i;
+Map> partitionRacks = 
mkMapOfPartitionRacks(5);
+topicMetadataMap.put(
+topicId,
+new TopicMetadata(topicId, topicName, 5, partitionRacks)
+);
+}
+assertEquals(topicMetadataMap, new 
SubscribedTopicMetadata(topicMetadataMap).topicMetadata());
+}
+
+@Test
+public void testTopicMetadataCannotBeNull() {
+assertThrows(NullPointerException.class, () -> new 
SubscribedTopicMetadata(null));
+}
+
+@Test
+public void testEquals() {
+Map topicMetadataMap = new HashMap<>();
+for (int i = 0; i < 5; i++) {
+Uuid topicId = Uuid.randomUuid();
+String topicName = "topic" + i;
+Map> partitionRacks = 
mkMapOfPartitionRacks(5);
+topicMetadataMap.put(
+topicId,
+new TopicMetadata(topicId, topicName, 5, partitionRacks)
+);
+}
+SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadataMap);
+assertEquals(new SubscribedTopicMetadata(topicMetadataMap), 
subscribedTopicMetadata);
+
+Map topicMetadataMap2 = new HashMap<>();
+Uuid topicId = Uuid.randomUuid();
+topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 
5, Collections.emptyMap()));
+assertNotEquals(new SubscribedTopicMetadata(topicMetadataMap2), 
subscribedTopicMetadata);
+}
+}

Review Comment:
   nit: Let's add a new line at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276670286


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -571,4 +716,14 @@ private void assertAssignment(Map>> expectedAssig
 assertEquals(expectedAssignment.get(memberId), 
computedAssignmentForMember);
 }
 }
+
+// When rack awareness is enabled for this assignor, rack information can 
be updated in this method.
+private Map> createPartitionMetadata(int 
numPartitions) {

Review Comment:
   nit: static?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276669652


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -764,4 +813,39 @@ public void testNewOffsetCommitTombstoneRecord() {
 Record record = 
RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1);
 assertEquals(expectedRecord, record);
 }
+
+// For the purpose of testing let:
+//  a) Number of replicas for each partition = 2
+//  b) Number of racks available in the cluster = 4

Review Comment:
   Should we remove this? It does not make sense here. Or was it supposed to be 
somewhere else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276668936


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -612,15 +616,14 @@ public void 
testNewGroupMetadataRecordThrowsWhenEmptyAssignment() {
 MetadataVersion.IBP_3_5_IV2
 ));
 }
-

Review Comment:
   It is not really about the empty line bit more about the unnecessary spaces 
on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r127851


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals(
 
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
 
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
 
fromTopicPartitions(actualValue.partitionsPendingAssignment()));
-} else {
-assertEquals(expected.message(), actual.message());
+} else if (actual.message() instanceof 
ConsumerGroupPartitionMetadataValue) {
+// The order of the racks stored in PartitionMetadata of 
ConsumerGroupPartitionMetadataValue is not
+// always guaranteed. Therefore, we need a special comparator.
+ConsumerGroupPartitionMetadataValue expectedValue =
+(ConsumerGroupPartitionMetadataValue) expected.message();
+ConsumerGroupPartitionMetadataValue actualValue =
+(ConsumerGroupPartitionMetadataValue) actual.message();
+
+List 
expectedTopicMetadataList =
+expectedValue.topics();
+List 
actualTopicMetadataList =
+actualValue.topics();
+
+if (expectedTopicMetadataList.size() != 
actualTopicMetadataList.size()) {
+fail("Topic metadata lists have different sizes");
+}
+
+for (int i = 0; i < expectedValue.topics().size(); i++) {
+ConsumerGroupPartitionMetadataValue.TopicMetadata 
expectedTopicMetadata =
+expectedTopicMetadataList.get(i);
+ConsumerGroupPartitionMetadataValue.TopicMetadata 
actualTopicMetadata =
+actualTopicMetadataList.get(i);
+
+assertEquals(expectedTopicMetadata.topicId(), 
actualTopicMetadata.topicId());
+assertEquals(expectedTopicMetadata.topicName(), 
actualTopicMetadata.topicName());
+assertEquals(expectedTopicMetadata.numPartitions(), 
actualTopicMetadata.numPartitions());
+
+List 
expectedPartitionMetadataList =
+expectedTopicMetadata.partitionMetadata();
+List 
actualPartitionMetadataList =
+actualTopicMetadata.partitionMetadata();
+
+// If the list is empty, rack information wasn't available for 
any replica of
+// the partition and hence, the entry wasn't added to the 
record.
+if (expectedPartitionMetadataList.size() != 
actualPartitionMetadataList.size()) {
+fail("Partition metadata lists have different sizes");
+} else if (!expectedPartitionMetadataList.isEmpty() && 
!actualPartitionMetadataList.isEmpty()) {
+for (int j = 0; j < expectedTopicMetadata.numPartitions(); 
j++) {
+ConsumerGroupPartitionMetadataValue.PartitionMetadata 
expectedPartitionMetadata =
+expectedPartitionMetadataList.get(j);
+ConsumerGroupPartitionMetadataValue.PartitionMetadata 
actualPartitionMetadata =
+actualPartitionMetadataList.get(j);
+
+assertEquals(expectedPartitionMetadata.partition(), 
actualPartitionMetadata.partition());
+
assertUnorderedListEquals(expectedPartitionMetadata.racks(), 
actualPartitionMetadata.racks());
+}
+} else {
+assertEquals(expected.message(), actual.message());

Review Comment:
   Why are we doing this? If the partition metadata are empty, we could just 
run the previous branch as well. It would be a no-op.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276663083


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals(
 
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
 
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
 
fromTopicPartitions(actualValue.partitionsPendingAssignment()));
-} else {
-assertEquals(expected.message(), actual.message());
+} else if (actual.message() instanceof 
ConsumerGroupPartitionMetadataValue) {
+// The order of the racks stored in PartitionMetadata of 
ConsumerGroupPartitionMetadataValue is not
+// always guaranteed. Therefore, we need a special comparator.
+ConsumerGroupPartitionMetadataValue expectedValue =
+(ConsumerGroupPartitionMetadataValue) expected.message();
+ConsumerGroupPartitionMetadataValue actualValue =
+(ConsumerGroupPartitionMetadataValue) actual.message();
+
+List 
expectedTopicMetadataList =
+expectedValue.topics();
+List 
actualTopicMetadataList =
+actualValue.topics();
+
+if (expectedTopicMetadataList.size() != 
actualTopicMetadataList.size()) {
+fail("Topic metadata lists have different sizes");
+}
+
+for (int i = 0; i < expectedValue.topics().size(); i++) {
+ConsumerGroupPartitionMetadataValue.TopicMetadata 
expectedTopicMetadata =
+expectedTopicMetadataList.get(i);
+ConsumerGroupPartitionMetadataValue.TopicMetadata 
actualTopicMetadata =
+actualTopicMetadataList.get(i);
+
+assertEquals(expectedTopicMetadata.topicId(), 
actualTopicMetadata.topicId());
+assertEquals(expectedTopicMetadata.topicName(), 
actualTopicMetadata.topicName());
+assertEquals(expectedTopicMetadata.numPartitions(), 
actualTopicMetadata.numPartitions());
+
+List 
expectedPartitionMetadataList =
+expectedTopicMetadata.partitionMetadata();
+List 
actualPartitionMetadataList =
+actualTopicMetadata.partitionMetadata();
+
+// If the list is empty, rack information wasn't available for 
any replica of
+// the partition and hence, the entry wasn't added to the 
record.
+if (expectedPartitionMetadataList.size() != 
actualPartitionMetadataList.size()) {
+fail("Partition metadata lists have different sizes");
+} else if (!expectedPartitionMetadataList.isEmpty() && 
!actualPartitionMetadataList.isEmpty()) {
+for (int j = 0; j < expectedTopicMetadata.numPartitions(); 
j++) {

Review Comment:
   Using numPartitions seems incorrect here as it could be larger than the 
number of partition metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276659754


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals(
 
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
 
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
 
fromTopicPartitions(actualValue.partitionsPendingAssignment()));
-} else {
-assertEquals(expected.message(), actual.message());
+} else if (actual.message() instanceof 
ConsumerGroupPartitionMetadataValue) {
+// The order of the racks stored in PartitionMetadata of 
ConsumerGroupPartitionMetadataValue is not
+// always guaranteed. Therefore, we need a special comparator.
+ConsumerGroupPartitionMetadataValue expectedValue =
+(ConsumerGroupPartitionMetadataValue) expected.message();
+ConsumerGroupPartitionMetadataValue actualValue =
+(ConsumerGroupPartitionMetadataValue) actual.message();
+

Review Comment:
   Should we assert numPartitions as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276659754


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5692,12 +5725,60 @@ private void assertApiMessageAndVersionEquals(
 
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
 
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
 
fromTopicPartitions(actualValue.partitionsPendingAssignment()));
-} else {
-assertEquals(expected.message(), actual.message());
+} else if (actual.message() instanceof 
ConsumerGroupPartitionMetadataValue) {
+// The order of the racks stored in PartitionMetadata of 
ConsumerGroupPartitionMetadataValue is not
+// always guaranteed. Therefore, we need a special comparator.
+ConsumerGroupPartitionMetadataValue expectedValue =
+(ConsumerGroupPartitionMetadataValue) expected.message();
+ConsumerGroupPartitionMetadataValue actualValue =
+(ConsumerGroupPartitionMetadataValue) actual.message();
+

Review Comment:
   Should we assert numPartitions as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276653838


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -5567,7 +5600,7 @@ public void testJoinGroupAppendErrorConversion() {
 assertEquals(Errors.LEADER_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE);
 }
 
-private  void assertUnorderedListEquals(
+public static  void assertUnorderedListEquals(

Review Comment:
   Is there a reason for making it public? I have the same question for the 
following two methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14748) Relax non-null FK left-join requirement

2023-07-27 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748285#comment-17748285
 ] 

Guozhang Wang commented on KAFKA-14748:
---

[~aki] Thanks for picking this series! I think we can have a light KIP just to 
summarize:

1. All the operator behavioral changes among these JIRAs.
2. Why we do not make it an opt-in; and therefore:
3. For users who still want the old behavior, what should they change in their 
code.


> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-07-27 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748284#comment-17748284
 ] 

Guozhang Wang commented on KAFKA-12317:
---

[~mjsax] Though it may not introduce any new configs or interfaces I'd still 
suggest we have a light KIP for the behavioral change, also we would definitely 
want to add a section in the upgrade guide for this.

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15240) BrokerToControllerChannelManager cache activeController error cause DefaultAlterPartitionManager send AlterPartition request failed

2023-07-27 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748281#comment-17748281
 ] 

Guozhang Wang commented on KAFKA-15240:
---

[~lushilin] Thanks for reporting this. I think [~cmccabe] [~hachikuji] would 
have the most context to help investigating.

> BrokerToControllerChannelManager cache activeController error cause 
> DefaultAlterPartitionManager send AlterPartition request failed
> ---
>
> Key: KAFKA-15240
> URL: https://issues.apache.org/jira/browse/KAFKA-15240
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0, 2.8.1, 2.8.2, 3.5.0
> Environment: 2.8.1 kafka version
>Reporter: shilin Lu
>Assignee: shilin Lu
>Priority: Major
> Attachments: image-2023-07-24-16-35-56-589.png
>
>
> After KIP-497,partition leader do not use zk to propagateIsrChanges,it will 
> send AlterPartitionRequest to controller to propagateIsrChanges.Then broker 
> will cache active controller node info through controllerNodeProvider 
> interface.
> 2023.07.12,in kafka product environment,we find so much `Broker had a stale 
> broker epoch` when send partitionAlterRequest to controller.And in this kafka 
> cluster has so much replica not in isr assignment with replica fetch is 
> correct.So it only propagateIsrChanges failed.
> !https://iwiki.woa.com/tencent/api/attachments/s3/url?attachmentid=3165506!
> But there has something strange,if broker send partitionAlterRequest failed 
> controller will print some log like this.But in active controller node not 
> find this log info
> !image-2023-07-24-16-35-56-589.png!
> Then i just suspect this broker connect to an error active controller.Through 
> network packet capture, find this broker connect to an unfamiliar broker 
> port(9092) send request.Refer to this kafka cluster operation history,find 
> this unfamiliar broker is an old broker node in this cluster and this node is 
> a controller node in new kafka cluster.
> Current BrokerToControllerChannelManager update active controller only 
> happened when disconnect or responseCode is NOT_CONTROLLER. So when no 
> request send and error broker node is another kafka cluster controller 
> node,this case will repetite.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ahuang98 commented on a diff in pull request #14084: [MINOR] Add latest versions to kraft upgrade kafkatest

2023-07-27 Thread via GitHub


ahuang98 commented on code in PR #14084:
URL: https://github.com/apache/kafka/pull/14084#discussion_r1276628397


##
tests/kafkatest/version.py:
##
@@ -252,3 +252,7 @@ def get_version(node=None):
 V_3_5_0 = KafkaVersion("3.5.0")
 V_3_5_1 = KafkaVersion("3.5.1")
 LATEST_3_5 = V_3_5_1
+
+# 3.6.x versions
+V_3_6_0 = KafkaVersion("3.6.0")
+LATEST_3_6 = V_3_6_0

Review Comment:
   I had noticed that 3.6 had been removed when I was working on this change 
and just thought I'd add it back in since it seems the convention is to add the 
new version whenever we cut the release branch for the prior version. Other 
than some OCD to follow the existing pattern, I'm indifferent to keeping it or 
leaving it out since as you said, it's not in use currently



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event

2023-07-27 Thread via GitHub


mumrah commented on code in PR #14115:
URL: https://github.com/apache/kafka/pull/14115#discussion_r1276619004


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -297,6 +297,16 @@ private boolean isValidStateChange(MigrationDriverState 
newState) {
 }
 }
 
+private boolean checkDriverState(MigrationDriverState expectedState) {
+if (migrationState.equals(expectedState)) {
+return true;
+} else {
+log.debug("Expected driver state {} but found {}. Not running this 
event {}.",

Review Comment:
   It shouldn't happen often, no. INFO sounds good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event

2023-07-27 Thread via GitHub


cmccabe commented on code in PR #14115:
URL: https://github.com/apache/kafka/pull/14115#discussion_r1276615552


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -297,6 +297,16 @@ private boolean isValidStateChange(MigrationDriverState 
newState) {
 }
 }
 
+private boolean checkDriverState(MigrationDriverState expectedState) {
+if (migrationState.equals(expectedState)) {
+return true;
+} else {
+log.debug("Expected driver state {} but found {}. Not running this 
event {}.",

Review Comment:
   Should this be `info`? It won't happen often, will it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event

2023-07-27 Thread via GitHub


mumrah commented on code in PR #14115:
URL: https://github.com/apache/kafka/pull/14115#discussion_r1276614670


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -599,6 +604,9 @@ public void run() throws Exception {
 class MigrateMetadataEvent extends MigrationEvent {
 @Override
 public void run() throws Exception {
+if (!checkDriverState(MigrationDriverState.ZK_MIGRATION)) {
+return;
+}

Review Comment:
   This is the actual fix. The rest of the changes are for 
cleanliness/readability 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


rreddy-22 commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276612481


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##
@@ -612,15 +616,14 @@ public void 
testNewGroupMetadataRecordThrowsWhenEmptyAssignment() {
 MetadataVersion.IBP_3_5_IV2
 ));
 }
-

Review Comment:
   just a line after the test, I think it can stay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #14084: [MINOR] Add latest versions to kraft upgrade kafkatest

2023-07-27 Thread via GitHub


rondagostino commented on code in PR #14084:
URL: https://github.com/apache/kafka/pull/14084#discussion_r1276607523


##
tests/kafkatest/version.py:
##
@@ -252,3 +252,7 @@ def get_version(node=None):
 V_3_5_0 = KafkaVersion("3.5.0")
 V_3_5_1 = KafkaVersion("3.5.1")
 LATEST_3_5 = V_3_5_1
+
+# 3.6.x versions
+V_3_6_0 = KafkaVersion("3.6.0")
+LATEST_3_6 = V_3_6_0

Review Comment:
   Seems odd to add this now since, as pointed out in 
https://github.com/apache/kafka/pull/13654#discussion_r1270835742, it won't be 
used.  Seems this makes most sense to add when we bump `__version__ = 
'3.7.0.dev0'` in `tests/kafkatest/__init__.py`.
   
   To be clear (and to reiterate what is mentioned in that linked comment), it 
doesn't hurt to add this.  But I'm wondering if I'm missing something since you 
decided to add it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event

2023-07-27 Thread via GitHub


mumrah opened a new pull request, #14115:
URL: https://github.com/apache/kafka/pull/14115

   To avoid processing events once we have transitioned to a new state, add a 
check at the beginning of the events created by PollEvent in 
KRaftMigrationDriver.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on pull request #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs

2023-07-27 Thread via GitHub


kamalcph commented on PR #14114:
URL: https://github.com/apache/kafka/pull/14114#issuecomment-1654008152

   @satishd @divijvaidya @showuon 
   
   PTAL when you get chance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15263) KRaftMigrationDriver can run the migration twice

2023-07-27 Thread David Arthur (Jira)
David Arthur created KAFKA-15263:


 Summary: KRaftMigrationDriver can run the migration twice
 Key: KAFKA-15263
 URL: https://issues.apache.org/jira/browse/KAFKA-15263
 Project: Kafka
  Issue Type: Bug
Reporter: David Arthur
Assignee: David Arthur


There is a narrow race condition in KRaftMigrationDriver where a PollEvent can 
run that sees the internal state as ZK_MIGRATION and is immediately followed by 
another poll event (due to external call to {{{}wakeup(){}}}) that results in 
two MigrateMetadataEvent being enqueued. 

Since MigrateMetadataEvent lacks a check on the internal state, this causes the 
metadata migration to occur twice. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


rreddy-22 commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276573086


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java:
##
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The subscribed topic metadata class is used by the {@link 
PartitionAssignor} to obtain
+ * topic and partition metadata for the topics that the consumer group is 
subscribed to.
+ */
+public class SubscribedTopicMetadata implements SubscribedTopicDescriber {

Review Comment:
   done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph opened a new pull request, #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs

2023-07-27 Thread via GitHub


kamalcph opened a new pull request, #14114:
URL: https://github.com/apache/kafka/pull/14114

   **Topic -> Broker Synonym**
   
   - local.retention.bytes -> log.local.retention.bytes 
   - local.retention.ms -> log.local.retention.ms
   
   We cannot add synonym for `remote.storage.enable` topic level config as it 
depends on 
[KIP-950](https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement)
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #14088: MINOR: Adjust Invalid Record Exception for Invalid Txn State as mentioned in KIP-890

2023-07-27 Thread via GitHub


jolshan merged PR #14088:
URL: https://github.com/apache/kafka/pull/14088


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #14049: KAFKA-14038: Optimise calculation of size for log in remote tier

2023-07-27 Thread via GitHub


clolov commented on code in PR #14049:
URL: https://github.com/apache/kafka/pull/14049#discussion_r1276543815


##
storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java:
##
@@ -74,6 +74,11 @@ public void 
onPartitionLeadershipChanges(Set leaderPartitions,
 public void onStopPartitions(Set partitions) {
 }
 
+@Override
+public long remoteLogSize(TopicIdPartition topicPartition, int 
leaderEpoch) {

Review Comment:
   Great catch! Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-27 Thread via GitHub


philipnee commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1653959801

   Hmm - you can try.  But I've had similar, if not the same, failures and I 
haven't had a chance to examine the cause.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 opened a new pull request, #14113: KAFKA-15260: RLM Task should wait for RLMM to initialize

2023-07-27 Thread via GitHub


abhijeetk88 opened a new pull request, #14113:
URL: https://github.com/apache/kafka/pull/14113

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented

2023-07-27 Thread Ravindranath Kakarla (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravindranath Kakarla updated KAFKA-15262:
-
Description: 
As per the MM2 
[KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
 the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
source cluster which then gets replicated to the target cluster. This can be 
used to demonstrate that MM2 is replicating the data.

However, this is not happening right now. To the contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

Is my understanding of the MirrorHeartbeatConnector accurate?

*Reference:*

_MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 

  was:
As per the MM2 
[KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
 the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
source cluster which then gets replicated to the target cluster. This can be 
used to demonstrate that MM2 is replicating the data.
h2. _Internal Topics_

_MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._

However, this is not happening right now. To contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

 

Is my understanding of the MirrorHeartbeatConnector accurate?

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 


> MirrorHeartbeatConnector is not working as documented
> -
>
> Key: KAFKA-15262
> URL: https://issues.apache.org/jira/browse/KAFKA-15262
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0, 3.4.0, 3.5.0
>Reporter: Ravindranath Kakarla
>Priority: Major
>
> As per the MM2 
> [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
>  the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
> source cluster which then gets replicated to the target cluster. This can be 
> used to demonstrate that MM2 is replicating the data.
> However, this is not happening right now. To the contrary, the 
> MirrorHeartbeatConnector is producing heartbeat pings to target cluster 
> instead of source. This is not much useful as it won't help detect problems 
> connecting to source cluster or with the data replication.
> Is my understanding of the MirrorHeartbeatConnector accurate?
> *Reference:*
> _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated 
> to demonstrate connectivity through the connectors. Downstream consumers can 
> use this topic to verify that a) the connector is running and b) the 
> corresponding source cluster is available. Heartbeats will get propagated by 
> source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat 
> are possible._
> [Code 
> Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


rreddy-22 commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1276487843


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -624,8 +745,27 @@ public void testDeleteMember() {
 20
 );
 
-Uuid fooTopicId = context.addTopicMetadata("foo", 6);
-Uuid barTopicId = context.addTopicMetadata("bar", 6);
+Uuid fooTopicId = context.addTopicMetadata("foo", 6,
+mkMap(
+mkEntry(0, Collections.emptySet()),
+mkEntry(1, Collections.emptySet()),
+mkEntry(2, Collections.emptySet()),
+mkEntry(3, Collections.emptySet()),
+mkEntry(4, Collections.emptySet()),
+mkEntry(5, Collections.emptySet())
+)
+);
+
+Uuid barTopicId = context.addTopicMetadata("bar", 6,

Review Comment:
   Added racks to one of the tests `testPartialAssignmentUpdate`.  and removed 
for the rest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-27 Thread via GitHub


rreddy-22 commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1275444245


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -624,8 +745,27 @@ public void testDeleteMember() {
 20
 );
 
-Uuid fooTopicId = context.addTopicMetadata("foo", 6);
-Uuid barTopicId = context.addTopicMetadata("bar", 6);
+Uuid fooTopicId = context.addTopicMetadata("foo", 6,
+mkMap(
+mkEntry(0, Collections.emptySet()),
+mkEntry(1, Collections.emptySet()),
+mkEntry(2, Collections.emptySet()),
+mkEntry(3, Collections.emptySet()),
+mkEntry(4, Collections.emptySet()),
+mkEntry(5, Collections.emptySet())
+)
+);
+
+Uuid barTopicId = context.addTopicMetadata("bar", 6,

Review Comment:
   
https://github.com/apache/kafka/pull/14099/files/70a6214809fba59511dab58a1da21f74be4edbae#r1275362470



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented

2023-07-27 Thread Ravindranath Kakarla (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravindranath Kakarla updated KAFKA-15262:
-
Description: 
As per the MM2 
[KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
 the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
source cluster which then gets replicated to the target cluster. This can be 
used to demonstrate that MM2 is replicating the data.
h2. _Internal Topics_

_MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._

However, this is not happening right now. To contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

 

Is my understanding of the MirrorHeartbeatConnector accurate?

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 

  was:
As per the MM2 
[KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
 the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
source cluster which then gets replicated to the target cluster. This can be 
used to demonstrate that MM2 is replicating the data.
h2. _Internal Topics_

_MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._

 

 

However, this is not happening right now. To contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

 

Is my understanding of the MirrorHeartbeatConnector accurate?

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 


> MirrorHeartbeatConnector is not working as documented
> -
>
> Key: KAFKA-15262
> URL: https://issues.apache.org/jira/browse/KAFKA-15262
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0, 3.4.0, 3.5.0
>Reporter: Ravindranath Kakarla
>Priority: Major
>
> As per the MM2 
> [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
>  the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
> source cluster which then gets replicated to the target cluster. This can be 
> used to demonstrate that MM2 is replicating the data.
> h2. _Internal Topics_
> _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated 
> to demonstrate connectivity through the connectors. Downstream consumers can 
> use this topic to verify that a) the connector is running and b) the 
> corresponding source cluster is available. Heartbeats will get propagated by 
> source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat 
> are possible._
> However, this is not happening right now. To contrary, the 
> MirrorHeartbeatConnector is producing heartbeat pings to target cluster 
> instead of source. This is not much useful as it won't help detect problems 
> connecting to source cluster or with the data replication.
>  
> Is my understanding of the MirrorHeartbeatConnector accurate?
> [Code 
> Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented

2023-07-27 Thread Ravindranath Kakarla (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravindranath Kakarla updated KAFKA-15262:
-
Description: 
As per the MM2 
[KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
 the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
source cluster which then gets replicated to the target cluster. This can be 
used to demonstrate that MM2 is replicating the data.
h2. _Internal Topics_

_MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._

 

 

However, this is not happening right now. To contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

 

Is my understanding of the MirrorHeartbeatConnector accurate?

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 

  was:
As per the MM2 [KIP-382 | 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] 
the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source 
cluster which then gets replicated to the target cluster. This can be used to 
demonstrate that MM2 is replicating the data.
h2. _Internal Topics_

_MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._

 

 

However, this is not happening right now. To contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

 

Is my understanding of the MirrorHeartbeatConnector accurate?

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 


> MirrorHeartbeatConnector is not working as documented
> -
>
> Key: KAFKA-15262
> URL: https://issues.apache.org/jira/browse/KAFKA-15262
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0, 3.4.0, 3.5.0
>Reporter: Ravindranath Kakarla
>Priority: Major
>
> As per the MM2 
> [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
>  the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
> source cluster which then gets replicated to the target cluster. This can be 
> used to demonstrate that MM2 is replicating the data.
> h2. _Internal Topics_
> _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated 
> to demonstrate connectivity through the connectors. Downstream consumers can 
> use this topic to verify that a) the connector is running and b) the 
> corresponding source cluster is available. Heartbeats will get propagated by 
> source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat 
> are possible._
>  
>  
> However, this is not happening right now. To contrary, the 
> MirrorHeartbeatConnector is producing heartbeat pings to target cluster 
> instead of source. This is not much useful as it won't help detect problems 
> connecting to source cluster or with the data replication.
>  
> Is my understanding of the MirrorHeartbeatConnector accurate?
> [Code 
> Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented

2023-07-27 Thread Ravindranath Kakarla (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravindranath Kakarla updated KAFKA-15262:
-
Description: 
As per the MM2 [KIP-382 | 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]] 
the MirrorHeartbeatConnector should emit pings to heartbeat topic on the source 
cluster which then gets replicated to the target cluster. This can be used to 
demonstrate that MM2 is replicating the data.
h2. _Internal Topics_

_MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._

 

 

However, this is not happening right now. To contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

 

Is my understanding of the MirrorHeartbeatConnector accurate?

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 

  was:
As per the MM2 
[KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
 the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
source cluster which then gets replicated to the target cluster. This can be 
used to demonstrate that MM2 is replicating the data.
h2. _Internal Topics_

_MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._

 

 

However, this is not happening right now. To contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

 

Is my understanding of the MirrorHeartbeatConnector accurate?

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 


> MirrorHeartbeatConnector is not working as documented
> -
>
> Key: KAFKA-15262
> URL: https://issues.apache.org/jira/browse/KAFKA-15262
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0, 3.4.0, 3.5.0
>Reporter: Ravindranath Kakarla
>Priority: Major
>
> As per the MM2 [KIP-382 | 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
>  the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
> source cluster which then gets replicated to the target cluster. This can be 
> used to demonstrate that MM2 is replicating the data.
> h2. _Internal Topics_
> _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated 
> to demonstrate connectivity through the connectors. Downstream consumers can 
> use this topic to verify that a) the connector is running and b) the 
> corresponding source cluster is available. Heartbeats will get propagated by 
> source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat 
> are possible._
>  
>  
> However, this is not happening right now. To contrary, the 
> MirrorHeartbeatConnector is producing heartbeat pings to target cluster 
> instead of source. This is not much useful as it won't help detect problems 
> connecting to source cluster or with the data replication.
>  
> Is my understanding of the MirrorHeartbeatConnector accurate?
> [Code 
> Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented

2023-07-27 Thread Ravindranath Kakarla (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravindranath Kakarla updated KAFKA-15262:
-
Description: 
As per the MM2 
[KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
 the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
source cluster which then gets replicated to the target cluster. This can be 
used to demonstrate that MM2 is replicating the data.
h2. _Internal Topics_

_MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible._

 

 

However, this is not happening right now. To contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

 

Is my understanding of the MirrorHeartbeatConnector accurate?

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 

  was:
As per the MM2 
[KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0],]
 the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
source cluster which then gets replicated to the target cluster. This can be 
used to demonstrate that MM2 is replicating the data.

"""
h2. Internal Topics

MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible.

"""

 

However, this is not happening right now. To contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

 

Is my understanding of the MirrorHeartbeatConnector accurate?

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 


> MirrorHeartbeatConnector is not working as documented
> -
>
> Key: KAFKA-15262
> URL: https://issues.apache.org/jira/browse/KAFKA-15262
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0, 3.4.0, 3.5.0
>Reporter: Ravindranath Kakarla
>Priority: Major
>
> As per the MM2 
> [KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]]
>  the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
> source cluster which then gets replicated to the target cluster. This can be 
> used to demonstrate that MM2 is replicating the data.
> h2. _Internal Topics_
> _MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated 
> to demonstrate connectivity through the connectors. Downstream consumers can 
> use this topic to verify that a) the connector is running and b) the 
> corresponding source cluster is available. Heartbeats will get propagated by 
> source and sink connectors s.t. chains like backup.us-west.us-east.heartbeat 
> are possible._
>  
>  
> However, this is not happening right now. To contrary, the 
> MirrorHeartbeatConnector is producing heartbeat pings to target cluster 
> instead of source. This is not much useful as it won't help detect problems 
> connecting to source cluster or with the data replication.
>  
> Is my understanding of the MirrorHeartbeatConnector accurate?
> [Code 
> Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15262) MirrorHeartbeatConnector is not working as documented

2023-07-27 Thread Ravindranath Kakarla (Jira)
Ravindranath Kakarla created KAFKA-15262:


 Summary: MirrorHeartbeatConnector is not working as documented
 Key: KAFKA-15262
 URL: https://issues.apache.org/jira/browse/KAFKA-15262
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.5.0, 3.4.0, 2.8.0
Reporter: Ravindranath Kakarla


As per the MM2 
[KIP-382|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0],]
 the MirrorHeartbeatConnector should emit pings to heartbeat topic on the 
source cluster which then gets replicated to the target cluster. This can be 
used to demonstrate that MM2 is replicating the data.

"""
h2. Internal Topics

MM2 emits a *heartbeat* *topic* in each source cluster, which is replicated to 
demonstrate connectivity through the connectors. Downstream consumers can use 
this topic to verify that a) the connector is running and b) the corresponding 
source cluster is available. Heartbeats will get propagated by source and sink 
connectors s.t. chains like backup.us-west.us-east.heartbeat are possible.

"""

 

However, this is not happening right now. To contrary, the 
MirrorHeartbeatConnector is producing heartbeat pings to target cluster instead 
of source. This is not much useful as it won't help detect problems connecting 
to source cluster or with the data replication.

 

Is my understanding of the MirrorHeartbeatConnector accurate?

[Code 
Ref|https://github.com/apache/kafka/blob/ed44bcd71b3b9926c474033882eaa6c1cf35cfa4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java#L65]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-27 Thread Max Riedel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748216#comment-17748216
 ] 

Max Riedel edited comment on KAFKA-14509 at 7/27/23 3:40 PM:
-

Hey [~dajac],

I started implementing the request/response schemas and classes and have 
several questions:
 # I changed the ConsumerGroupDescribeResponse.json schema compared to what is 
stated 
[here|#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI].]
 I think the changes are necessary to make it compile but I'm not entirely sure 
if I understood everything correctly.
 # How do I set errorCode in ConsumerGroupDescribeRequest correctly? Other 
Request objects have a field errorCode, while in this case we have a list of 
DescribedGroup which can have individual errorCodes. Without setting this 
correctly I can't make testSerialization test in RequestResponseTest.java work.
 # This also makes errorCounts in ConsumerGroupDescribeResponse more 
complicated and I'm not sure if I did the right thing.

You can have a look into my first draft 
[here|[https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]].
 Please let us discuss my questions soon :)


was (Author: JIRAUSER300902):
Hey [~dajac],

I started implementing the request/response schemas and classes and have 
several questions:
 # I changed the ConsumerGroupDescribeResponse.json schema compared to what is 
stated 
[here|#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI].]
 I think the changes are necessary to make it compile but I'm not entirely sure 
if I understood everything correctly.
 # How do I set errorCode in ConsumerGroupDescribeRequest correctly? Other 
Request objects have a field errorCode, while in this case we have a list of 
DescribedGroup which can have individual errorCodes. Without setting this 
correctly I can't make testSerialization test in RequestResponseTest.java work.
 # This also makes errorCounts in ConsumerGroupDescribeResponse more 
complicated and I'm not sure if I did the right thing.

You can have a look into my first draft 
[here|[https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]].
 Please let us discuss my questions soon :)

> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>
> The goal of this task is to implement the ConsumerGroupDescribe API as 
> described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI];
>  and to implement the related changes in the admin client as described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups].
> On the server side, this mainly requires the following steps:
>  # The request/response schemas must be defined (see 
> ListGroupsRequest/Response.json for an example);
>  # Request/response classes must be defined (see 
> ListGroupsRequest/Response.java for an example);
>  # The API must be defined in KafkaApis (see 
> KafkaApis#handleDescribeGroupsRequest for an example);
>  # The GroupCoordinator interface (java file) must be extended for the new 
> operations.
>  # The new operation must be implemented in GroupCoordinatorService (new 
> coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in 
> Scala) should just reject the request.
> We could probably do 1) and 2) in one pull request and the remaining ones in 
> another.
> On the admin client side, this mainly requires the followings steps:
>  * Define all the new java classes as defined in the KIP.
>  * Add the new API to KafkaAdminClient class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-27 Thread Max Riedel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748216#comment-17748216
 ] 

Max Riedel edited comment on KAFKA-14509 at 7/27/23 3:39 PM:
-

Hey [~dajac],

I started implementing the request/response schemas and classes and have 
several questions:
 # I changed the ConsumerGroupDescribeResponse.json schema compared to what is 
stated 
[here|#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI].]
 I think the changes are necessary to make it compile but I'm not entirely sure 
if I understood everything correctly.
 # How do I set errorCode in ConsumerGroupDescribeRequest correctly? Other 
Request objects have a field errorCode, while in this case we have a list of 
DescribedGroup which can have individual errorCodes. Without setting this 
correctly I can't make testSerialization test in RequestResponseTest.java work.
 # This also makes errorCounts in ConsumerGroupDescribeResponse more 
complicated and I'm not sure if I did the right thing.

You can have a look into my first draft 
[here|[https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]].
 Please let us discuss my questions soon :)


was (Author: JIRAUSER300902):
Hey [~dajac],

I started implementing the request/response schemas and classes and have 
several questions:
 # I changed the ConsumerGroupDescribeResponse.json schema compared to what is 
stated 
[here|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI].]
 I think the changes are necessary to make it compile but I'm not entirely sure 
if I understood everything correctly.
 # How do I set errorCode in ConsumerGroupDescribeRequest correctly? Other 
Request objects have a field errorCode, while in this case we have a list of 
DescribedGroup which can have individual errorCodes. Without setting this 
correctly I can't make testSerialization test in RequestResponseTest.java work.
 # This also makes errorCounts in ConsumerGroupDescribeResponse more 
complicated and I'm not sure if I did the right thing.



You can have a look into my first draft 
[here|[https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]|https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11].
 Please let us discuss my questions soon :)

> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>
> The goal of this task is to implement the ConsumerGroupDescribe API as 
> described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI];
>  and to implement the related changes in the admin client as described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups].
> On the server side, this mainly requires the following steps:
>  # The request/response schemas must be defined (see 
> ListGroupsRequest/Response.json for an example);
>  # Request/response classes must be defined (see 
> ListGroupsRequest/Response.java for an example);
>  # The API must be defined in KafkaApis (see 
> KafkaApis#handleDescribeGroupsRequest for an example);
>  # The GroupCoordinator interface (java file) must be extended for the new 
> operations.
>  # The new operation must be implemented in GroupCoordinatorService (new 
> coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in 
> Scala) should just reject the request.
> We could probably do 1) and 2) in one pull request and the remaining ones in 
> another.
> On the admin client side, this mainly requires the followings steps:
>  * Define all the new java classes as defined in the KIP.
>  * Add the new API to KafkaAdminClient class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-27 Thread Max Riedel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748216#comment-17748216
 ] 

Max Riedel commented on KAFKA-14509:


Hey [~dajac],

I started implementing the request/response schemas and classes and have 
several questions:
 # I changed the ConsumerGroupDescribeResponse.json schema compared to what is 
stated 
[here|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI].]
 I think the changes are necessary to make it compile but I'm not entirely sure 
if I understood everything correctly.
 # How do I set errorCode in ConsumerGroupDescribeRequest correctly? Other 
Request objects have a field errorCode, while in this case we have a list of 
DescribedGroup which can have individual errorCodes. Without setting this 
correctly I can't make testSerialization test in RequestResponseTest.java work.
 # This also makes errorCounts in ConsumerGroupDescribeResponse more 
complicated and I'm not sure if I did the right thing.



You can have a look into my first draft 
[here|[https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11]|https://github.com/riedelmax/kafka/commit/e1818eee1347743ac71237e1db16de57231dfc11].
 Please let us discuss my questions soon :)

> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>
> The goal of this task is to implement the ConsumerGroupDescribe API as 
> described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI];
>  and to implement the related changes in the admin client as described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups].
> On the server side, this mainly requires the following steps:
>  # The request/response schemas must be defined (see 
> ListGroupsRequest/Response.json for an example);
>  # Request/response classes must be defined (see 
> ListGroupsRequest/Response.java for an example);
>  # The API must be defined in KafkaApis (see 
> KafkaApis#handleDescribeGroupsRequest for an example);
>  # The GroupCoordinator interface (java file) must be extended for the new 
> operations.
>  # The new operation must be implemented in GroupCoordinatorService (new 
> coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in 
> Scala) should just reject the request.
> We could probably do 1) and 2) in one pull request and the remaining ones in 
> another.
> On the admin client side, this mainly requires the followings steps:
>  * Define all the new java classes as defined in the KIP.
>  * Add the new API to KafkaAdminClient class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #13671: KAFKA-14967: fix NPE in MockAdminClient CreateTopicsResult

2023-07-27 Thread via GitHub


mimaison commented on PR #13671:
URL: https://github.com/apache/kafka/pull/13671#issuecomment-1653859939

   Thanks @hertzsprung for the PR!
   No code in Kafka relies on the content of CreateTopicsResult, and I just 
realized MockAdminClient is not part of the public API so this is effectively 
"dead code". That said, I see that we've tried in the past to fix consistency 
issues between that and the real Admin implementation so I think we can still 
merge this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abhijeetk88 opened a new pull request, #14112: KAFKA-15261: Do not block replica fetcher if RLMM is not initialized

2023-07-27 Thread via GitHub


abhijeetk88 opened a new pull request, #14112:
URL: https://github.com/apache/kafka/pull/14112

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ex172000 commented on pull request #14110: MINOR: Add test for describe topic with ID

2023-07-27 Thread via GitHub


ex172000 commented on PR #14110:
URL: https://github.com/apache/kafka/pull/14110#issuecomment-1653814175

   The test failed, but not related. All failed tests are integration test and 
this is a unit test only change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

2023-07-27 Thread via GitHub


jsancio commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1276417011


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2394,6 +2394,11 @@ public Optional latestSnapshotId() {
 return log.latestSnapshotId();
 }
 
+@Override
+public long logEndOffset() {
+return log.endOffset().offset;
+}

Review Comment:
   > Hmm... the method name is "logEndOffset." So it should just return the log 
end offset, right? Returning something else would be misleading.
   
   @cmccabe, I don't follow this comment. When the client calls 
`KafkaRaftClient::schedule{Atomic}Append` the `KafkaRaftClient` compare the 
provided offset with the `nextOffset` stored in the `BatchAccumulator`. If we 
want this method to succeed in most cases `KafkaRaftClient::logEndOffset` 
should return that offset, `BatchAccumulator::nextOffset` and not the log end 
offset.
   
   Maybe `logEndOffset` is not a great name. I am okay renaming this to 
`KafkaRaftClient::endOffset()` but I am open to suggestions.



##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -355,7 +355,8 @@ public void 
testAppendFailedWithRecordBatchTooLargeException() throws Exception
 for (int i = 0; i < size; i++)
 batchToLarge.add("a");
 
-assertThrows(RecordBatchTooLargeException.class, () -> 
context.client.scheduleAtomicAppend(epoch, batchToLarge));
+assertThrows(RecordBatchTooLargeException.class,
+() -> context.client.scheduleAtomicAppend(epoch, 
OptionalLong.empty(), batchToLarge));

Review Comment:
   I couldn't find a test for the new `KafkaRaftClient::scheduleAtomicAppend`.



##
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##
@@ -171,16 +172,21 @@ default void beginShutdown() {}
  * to resign its leadership. The state machine is expected to discard all
  * uncommitted entries after observing an epoch change.
  *
+ * If the current base offset does not match the supplied required base 
offset,
+ * then this method will throw {@link UnexpectedBaseOffsetException}.
+ *
  * @param epoch the current leader epoch
+ * @param requiredBaseOffset if this is set, it is the offset we must use 
as the base offset.
  * @param records the list of records to append
  * @return the expected offset of the last record if append succeed
  * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if 
the size of the records is greater than the maximum
  * batch size; if this exception is throw none of the elements in 
records were
  * committed
  * @throws NotLeaderException if we are not the current leader or the 
epoch doesn't match the leader epoch
  * @throws BufferAllocationException we failed to allocate memory for the 
records
+ * @throws UnexpectedBaseOffsetException the requested base offset could 
not be obtained.
  */
-long scheduleAtomicAppend(int epoch, List records);
+long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, 
List records);

Review Comment:
   What's the argument/reason for adding this functionality to 
`scheduleAtomicAppend` and not `scheduleAppend`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >