Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-16 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2088351150


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,98 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * If there is no topic, the hash value is set to 0.
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+if (topicHashes.isEmpty()) {
+return 0;
+}
+
+// Sort entries by topic name
+List> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+sortedEntries.sort(Map.Entry.comparingByKey());
+
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+for (Map.Entry entry : sortedEntries) {
+hasher.putLong(entry.getValue());
+}
+
+return hasher.getAsLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by streaming XXH3.
+ * 
+ * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+ * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+ * new hash version.
+ * 
+ * For non-existent topics, the hash value is set to 0.
+ * For existent topics, the hashing process involves the following steps:
+ * 1. Write a magic byte to denote the version of the hash function.
+ * 2. Write the hash code of the topic ID.
+ * 3. Write the topic name.
+ * 4. Write the number of partitions associated with the topic.
+ * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+ * - Rack identifiers are formatted as 
"" to prevent issues with simple separators.
+ *
+ * @param topicName The topic image.
+ * @param metadataImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(String topicName, MetadataImage 
metadataImage) {
+TopicImage topicImage = metadataImage.topics().getTopic(topicName);
+if (topicImage == null) {
+return 0;
+}
+
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+hasher = hasher
+.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte

Review Comment:
   I wonder whether we should remove the inline comments are the code is pretty 
self explanatory and you already explain the format in the java doc.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,98 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * If there is no topic, the hash value is set to 0.
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+if (topicHashes.isEmpty()) {
+return 0;
+}
+
+// Sort entries by topic name
+List> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+sortedEntries.sort(Map.Entry.comparingByKey());
+
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+for (Map.Entry entry : sortedEntries) {
+hasher.putLong(entry.getValue());
+}
+
+return hasher.getAsLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by streaming XXH3.
+ * 
+ * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+ * method and is stored as part 

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-16 Thread via GitHub


dajac commented on PR #19523:
URL: https://github.com/apache/kafka/pull/19523#issuecomment-2879230830

   @FrankYang0529 Thanks for the update. Could you please also update the 
description of the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-13 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2087028173


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Sort entries by topic name
+List> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+sortedEntries.sort(Map.Entry.comparingByKey());
+
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+for (Map.Entry entry : sortedEntries) {
+hasher.putLong(entry.getValue());
+}
+
+return hasher.getAsLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ * 
+ * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+ * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+ * new hash version.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Write a magic byte to denote the version of the hash function.
+ * 2. Write the hash code of the topic ID.
+ * 3. Write the topic name.
+ * 4. Write the number of partitions associated with the topic.
+ * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+ *- Rack identifiers are formatted as 
"" to prevent issues with simple separators.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+hasher = hasher.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte

Review Comment:
   Updated it.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Sort entries by topic name
+List> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+sortedEntries.sort(Map.Entry.comparingByKey());
+
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+for (Map.Entry entry : sortedEntries) {
+hasher.putLong(entry.getValue());
+}
+
+return hasher.getAsLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ * 
+ * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+ * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+ * new hash version.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Write a magic byte to denote the version of the hash function.
+ * 2. Write the hash code of the topic ID.
+ * 3. Write the topic name.
+ * 4. Write the number of partitions associated with the topic.
+ * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+ *- Rack identifiers

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-13 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2087258500


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {

Review Comment:
   Ok. It works for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-13 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2087259372


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.MetadataImage;
+
+import com.dynatrace.hash4j.hashing.Hashing;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+
+@Test
+void testComputeTopicHash() throws IOException {
+long result = 
Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), 
FOO_METADATA_IMAGE.cluster());

Review Comment:
   I wonder if we could return 0 when the topic does not exist. Would it work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-13 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2087257495


##
build.gradle:
##
@@ -1414,10 +1414,12 @@ project(':group-coordinator') {
 implementation project(':coordinator-common')
 implementation libs.jacksonDatabind
 implementation libs.jacksonJDK8Datatypes
+implementation libs.lz4
 implementation libs.metrics
 implementation libs.hdrHistogram
 implementation libs.re2j
 implementation libs.slf4jApi
+implementation libs.hash4j

Review Comment:
   This is fine. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-13 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2087024185


##
build.gradle:
##
@@ -1414,10 +1414,12 @@ project(':group-coordinator') {
 implementation project(':coordinator-common')
 implementation libs.jacksonDatabind
 implementation libs.jacksonJDK8Datatypes
+implementation libs.lz4
 implementation libs.metrics
 implementation libs.hdrHistogram
 implementation libs.re2j
 implementation libs.slf4jApi
+implementation libs.hash4j

Review Comment:
   Yes, I added it to LICENSE-binary and `python 
committer-tools/verify_license.py` can pass. Do I need to modify other files? 
Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-13 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2087031341


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {

Review Comment:
   IIUC, this function will only be used in `GroupMetadataManager` like 
`throwIfRegularExpressionIsInvalid`. I think we can add `public` when we need 
it. WDYT?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Sort entries by topic name
+List> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+sortedEntries.sort(Map.Entry.comparingByKey());
+
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+for (Map.Entry entry : sortedEntries) {
+hasher.putLong(entry.getValue());
+}
+
+return hasher.getAsLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ * 
+ * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+ * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+ * new hash version.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Write a magic byte to denote the version of the hash function.
+ * 2. Write the hash code of the topic ID.
+ * 3. Write the topic name.
+ * 4. Write the number of partitions associated with the topic.
+ * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+ *- Rack identifiers are formatted as 
"" to prevent issues with simple separators.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-13 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2087037433


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.MetadataImage;
+
+import com.dynatrace.hash4j.hashing.Hashing;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+
+@Test
+void testComputeTopicHash() throws IOException {
+long result = 
Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), 
FOO_METADATA_IMAGE.cluster());

Review Comment:
   If input is topic name and metadata image, we need to handle null 
`TopicImage` in `computeTopicHash`. Do we want to handle this error in it?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.MetadataImage;
+
+import com.dynatrace.hash4j.hashing.Hashing;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+
+@Test
+void testComputeTopicHash() throws IOException {
+long result = 
Utils.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), 
FOO_METADATA_IMAGE.cluster());
+
+long expected = Hashing.xxh3_64().hashStream()
+.putByte((byte) 0) // magic byte
+.putLong(FOO_TOPIC_ID.hashCode()) // topic ID
+.putString(FOO_TOPIC_NAME) // topic name
+.putInt(FOO_NUM_PARTITIONS) // number o

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-13 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2087027573


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Sort entries by topic name
+List> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+sortedEntries.sort(Map.Entry.comparingByKey());
+
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+for (Map.Entry entry : sortedEntries) {
+hasher.putLong(entry.getValue());
+}
+
+return hasher.getAsLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ * 
+ * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+ * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+ * new hash version.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Write a magic byte to denote the version of the hash function.
+ * 2. Write the hash code of the topic ID.
+ * 3. Write the topic name.
+ * 4. Write the number of partitions associated with the topic.
+ * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+ *- Rack identifiers are formatted as 
"" to prevent issues with simple separators.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+hasher = hasher.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte
+.putLong(topicImage.id().hashCode()) // topic ID
+.putString(topicImage.name()) // topic name
+.putInt(topicImage.partitions().size()); // number of partitions
+
+for (int i = 0; i < topicImage.partitions().size(); i++) {
+hasher = hasher.putInt(i); // partition id
+// The rack string combination cannot use simple separator like 
",", because there is no limitation for rack character.
+// If using simple separator like "," it may hit edge case like 
",," and ",,," / ",,," and ",,".
+// Add length before the rack string to avoid the edge case.
+List racks = new ArrayList<>();

Review Comment:
   Yes, I initialize `racks` outside the for-loop and call `clear` before 
adding data.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Sort entries by topic name
+List> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+sortedEntries.sort(Map.Entry.comparingByKey());
+
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+for (Map.Entry entry : sortedEntries) {
+hasher.putLong(entry.getValue());
+}
+
+return hasher.getAsLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ * 

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-13 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2087026067


##
build.gradle:
##
@@ -1414,10 +1414,12 @@ project(':group-coordinator') {
 implementation project(':coordinator-common')
 implementation libs.jacksonDatabind
 implementation libs.jacksonJDK8Datatypes
+implementation libs.lz4

Review Comment:
   Removed it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-13 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2086685337


##
build.gradle:
##
@@ -1414,10 +1414,12 @@ project(':group-coordinator') {
 implementation project(':coordinator-common')
 implementation libs.jacksonDatabind
 implementation libs.jacksonJDK8Datatypes
+implementation libs.lz4

Review Comment:
   Do we need this change?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Sort entries by topic name
+List> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+sortedEntries.sort(Map.Entry.comparingByKey());
+
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+for (Map.Entry entry : sortedEntries) {
+hasher.putLong(entry.getValue());
+}
+
+return hasher.getAsLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ * 
+ * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+ * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+ * new hash version.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Write a magic byte to denote the version of the hash function.
+ * 2. Write the hash code of the topic ID.
+ * 3. Write the topic name.
+ * 4. Write the number of partitions associated with the topic.
+ * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+ *- Rack identifiers are formatted as 
"" to prevent issues with simple separators.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+HashStream64 hasher = Hashing.xxh3_64().hashStream();
+hasher = hasher.putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte
+.putLong(topicImage.id().hashCode()) // topic ID
+.putString(topicImage.name()) // topic name
+.putInt(topicImage.partitions().size()); // number of partitions
+
+for (int i = 0; i < topicImage.partitions().size(); i++) {
+hasher = hasher.putInt(i); // partition id
+// The rack string combination cannot use simple separator like 
",", because there is no limitation for rack character.
+// If using simple separator like "," it may hit edge case like 
",," and ",,," / ",,," and ",,".
+// Add length before the rack string to avoid the edge case.
+List racks = new ArrayList<>();

Review Comment:
   nit: I wonder whether we could reuse this list. We could declare it before 
the loop and clear it before using it. What do you think?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +329,84 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Write each topic hash in order.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Sort entries by topic name
+List> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+sortedEntries.sort(Map.Entry.comparingByKe

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-12 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2084021758


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.image.MetadataImage;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+private static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();

Review Comment:
   I will update the PR to use hash4j with streaming XXH3. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-12 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2083981644


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.image.MetadataImage;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+private static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();

Review Comment:
   I confirm that it is sever-side only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-11 Thread via GitHub


ijuma commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2083606336


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.image.MetadataImage;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+private static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();

Review Comment:
   If this is a server-side dependency, it seems fine. If it's also 
client-side, we probably need to think more about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-11 Thread via GitHub


ijuma commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2083606229


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.image.MetadataImage;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+private static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();

Review Comment:
   It looks like a high quality library from their description of why they 
didn't use existing ones: 
https://www.dynatrace.com/news/blog/hash4j-new-library-java/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-09 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2081899976


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.image.MetadataImage;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+private static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();

Review Comment:
   Thanks. I am fine with taking `hash4j` as a dependency. It is a small one 
without too much risk but let's see what @ijuma and @chia7712 think about it 
too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-09 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2081683481


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.image.MetadataImage;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+private static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();

Review Comment:
   Using streaming XXH3 seems pretty interesting to me given the results. Is 
`com.dynatrace.hash4j` the only way to get it? The library seems reasonable to 
be taken as a dependency on the server.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-09 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2081721119


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.image.MetadataImage;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+private static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();

Review Comment:
   From https://xxhash.com/, there are three Java libraries. Only 
zero-allocation-hashing and hash4j provides XXH3. However, only hash4j has 
streaming hash class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-09 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2081650493


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +336,106 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Convert each long hash value into a byte array.
+ * 3. Combine the sorted byte arrays to produce a final hash for the group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0);
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ * 
+ * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+ * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+ * new hash version.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Write a magic byte to denote the version of the hash function.
+ * 2. Write the hash code of the topic ID.
+ * 3. Write the UTF-8 encoded topic name.
+ * 4. Write the number of partitions associated with the topic.
+ * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+ *- Rack identifiers are formatted as "length1:value1,length2:value2" 
to prevent issues with simple separators.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512);
+ DataOutputStream dos = new DataOutputStream(bbos)) {
+dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte
+dos.writeLong(topicImage.id().hashCode()); // topic ID
+dos.writeUTF(topicImage.name()); // topic name
+dos.writeInt(topicImage.partitions().size()); // number of 
partitions
+for (int i = 0; i < topicImage.partitions().size(); i++) {
+dos.writeInt(i); // partition id
+List sortedRacksList = 
Arrays.stream(topicImage.partitions().get(i).replicas)
+.mapToObj(clusterImage::broker)
+.filter(Objects::nonNull)
+.map(BrokerRegistration::rack)
+.filter(Optional::isPresent)
+.map(Optional::get)
+.sorted()
+.toList();
+
+// The rack string combination cannot use simple separator 
like ",", because there is no limitation for rack character.
+ 

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-09 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2081623145


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +336,106 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Convert each long hash value into a byte array.
+ * 3. Combine the sorted byte arrays to produce a final hash for the group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0);
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ * 
+ * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+ * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+ * new hash version.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Write a magic byte to denote the version of the hash function.
+ * 2. Write the hash code of the topic ID.
+ * 3. Write the UTF-8 encoded topic name.
+ * 4. Write the number of partitions associated with the topic.
+ * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+ *- Rack identifiers are formatted as "length1:value1,length2:value2" 
to prevent issues with simple separators.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512);
+ DataOutputStream dos = new DataOutputStream(bbos)) {
+dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte
+dos.writeLong(topicImage.id().hashCode()); // topic ID
+dos.writeUTF(topicImage.name()); // topic name
+dos.writeInt(topicImage.partitions().size()); // number of 
partitions
+for (int i = 0; i < topicImage.partitions().size(); i++) {
+dos.writeInt(i); // partition id
+List sortedRacksList = 
Arrays.stream(topicImage.partitions().get(i).replicas)
+.mapToObj(clusterImage::broker)
+.filter(Objects::nonNull)
+.map(BrokerRegistration::rack)
+.filter(Optional::isPresent)
+.map(Optional::get)
+.sorted()
+.toList();
+
+// The rack string combination cannot use simple separator 
like ",", because there is no limitation for rack character.
+ 

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-05 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2073493044


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.image.MetadataImage;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+private static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();

Review Comment:
   Thanks for the suggestion. I do benchmark for streaming XXH3 / streaming 
XXH64 / non-streaming XXH3 / non-streaming XXH64. The streaming XXH3 gets the 
best result. However, it needs to include new library `com.dynatrace.hash4j`. 
Do we want to import it?
   
   cc @chia7712 @dajac 
   
   ```
   Benchmark  (numReplicasPerBroker)  
(partitionsPerTopic)  (replicationFactor)  Mode  Cnt  Score  Error  
Units
   TopicHashBenchmark.testDynatraceStreamingXXH3  10
103  avgt5879.241 ±6.788  ns/op
   TopicHashBenchmark.testDynatraceStreamingXXH3  10
503  avgt5   4192.380 ±  195.424  ns/op
   TopicHashBenchmark.testDynatraceStreamingXXH3  10
   1003  avgt5   8027.227 ±  210.403  ns/op
   TopicHashBenchmark.testDynatraceXXH3   10
103  avgt5   1676.398 ±2.249  ns/op
   TopicHashBenchmark.testDynatraceXXH3   10
503  avgt5   9256.175 ±   45.298  ns/op
   TopicHashBenchmark.testDynatraceXXH3   10
   1003  avgt5  20195.772 ±   37.651  ns/op
   TopicHashBenchmark.testLz4StreamingXXHash6410
103  avgt5   9739.833 ±  188.303  ns/op
   TopicHashBenchmark.testLz4StreamingXXHash6410
503  avgt5  45540.195 ±  455.747  ns/op
   TopicHashBenchmark.testLz4StreamingXXHash6410
   1003  avgt5  89084.689 ± 2164.862  ns/op
   TopicHashBenchmark.testLz4XXHash64 10
103  avgt5   1755.391 ±6.436  ns/op
   TopicHashBenchmark.testLz4XXHash64 10
503  avgt5   9421.643 ±   79.838  ns/op
   TopicHashBenchmark.testLz4XXHash64 10
   1003  avgt5  19461.960 ±  425.881  ns/op
   JMH benchmarks done
   ```
   
   
TopicHashBenchmark.java
   
   
   ```java
   package org.apache.kafka.jmh.metadata;
   
   import org.apache.kafka.common.Uuid;
   import org.apache.kafka.common.metadata.RegisterBrokerRecord;
   i

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-01 Thread via GitHub


ijuma commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2070951806


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +336,106 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Convert each long hash value into a byte array.
+ * 3. Combine the sorted byte arrays to produce a final hash for the group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0);
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ * 
+ * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+ * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+ * new hash version.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Write a magic byte to denote the version of the hash function.
+ * 2. Write the hash code of the topic ID.
+ * 3. Write the UTF-8 encoded topic name.
+ * 4. Write the number of partitions associated with the topic.
+ * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+ *- Rack identifiers are formatted as "length1:value1,length2:value2" 
to prevent issues with simple separators.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(512);
+ DataOutputStream dos = new DataOutputStream(bbos)) {
+dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte
+dos.writeLong(topicImage.id().hashCode()); // topic ID
+dos.writeUTF(topicImage.name()); // topic name
+dos.writeInt(topicImage.partitions().size()); // number of 
partitions
+for (int i = 0; i < topicImage.partitions().size(); i++) {
+dos.writeInt(i); // partition id
+List sortedRacksList = 
Arrays.stream(topicImage.partitions().get(i).replicas)
+.mapToObj(clusterImage::broker)
+.filter(Objects::nonNull)
+.map(BrokerRegistration::rack)
+.filter(Optional::isPresent)
+.map(Optional::get)
+.sorted()
+.toList();
+
+// The rack string combination cannot use simple separator 
like ",", because there is no limitation for rack character.
+ 

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-01 Thread via GitHub


ijuma commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2070950389


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/UtilsTest.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.image.MetadataImage;
+
+import net.jpountz.xxhash.XXHash64;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class UtilsTest {
+private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+private static final String FOO_TOPIC_NAME = "foo";
+private static final String BAR_TOPIC_NAME = "bar";
+private static final int FOO_NUM_PARTITIONS = 2;
+private static final MetadataImage FOO_METADATA_IMAGE = new 
MetadataImageBuilder()
+.addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+.addRacks()
+.build();
+private static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();

Review Comment:
   `XXH3` seems to be the fastest implementation. Did we consider using that?
   
   https://github.com/Cyan4973/xxHash



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-01 Thread via GitHub


ijuma commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2070948145


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +336,106 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ * 
+ * The computed hash value is stored as part of the metadata hash in the 
*GroupMetadataValue.
+ * 
+ * The hashing process involves the following steps:
+ * 1. Sort the topic hashes by topic name.
+ * 2. Convert each long hash value into a byte array.
+ * 3. Combine the sorted byte arrays to produce a final hash for the group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {

Review Comment:
   I think we're adding a lot of unnecessary overhead for the hash computation 
(multiple map calls, etc.). We should probably just use an old school loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-05-01 Thread via GitHub


squah-confluent commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2070408828


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+return Hashing.combineOrdered(
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey())
+.map(e -> HashCode.fromLong(e.getValue()))
+.toList()
+).asLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+HashFunction hf = Hashing.murmur3_128();
+Hasher topicHasher = hf.newHasher()
+.putByte((byte) 0) // magic byte
+.putLong(topicImage.id().hashCode()) // topic Id
+.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+.putInt(topicImage.partitions().size()); // number of partitions
+
+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
 -> {
+topicHasher.putInt(entry.getKey()); // partition id
+String racks = Arrays.stream(entry.getValue().replicas)
+.mapToObj(clusterImage::broker)
+.filter(Objects::nonNull)
+.map(BrokerRegistration::rack)
+.filter(Optional::isPresent)
+.map(Optional::get)
+.sorted()
+.collect(Collectors.joining(";"));

Review Comment:
   Thanks for taking the suggestion. I think it's fine now.
   Small nit though, I was actually thinking of writing the length in binary, 
using `writeInt` and dropping the `:` and `,` separators entirely. Apologies if 
I wasn't clear enough earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-30 Thread via GitHub


chia7712 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2069288073


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +337,87 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0);
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {

Review Comment:
   > If the hashing function is ever changed, is there a version field that 
should be updated?
   
   yes, there is a magic byte as version. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-30 Thread via GitHub


squah-confluent commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2069188025


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+return Hashing.combineOrdered(
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey())
+.map(e -> HashCode.fromLong(e.getValue()))
+.toList()
+).asLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+HashFunction hf = Hashing.murmur3_128();
+Hasher topicHasher = hf.newHasher()
+.putByte((byte) 0) // magic byte
+.putLong(topicImage.id().hashCode()) // topic Id
+.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+.putInt(topicImage.partitions().size()); // number of partitions
+
+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
 -> {
+topicHasher.putInt(entry.getKey()); // partition id
+String racks = Arrays.stream(entry.getValue().replicas)
+.mapToObj(clusterImage::broker)
+.filter(Objects::nonNull)
+.map(BrokerRegistration::rack)
+.filter(Optional::isPresent)
+.map(Optional::get)
+.sorted()
+.collect(Collectors.joining(";"));

Review Comment:
   I think this is fine preventing for accidental collisions. Though it's still 
possible to _intentionally_ come up with rack names that create collisions, but 
I believe you'd only be impacting your own cluster.
   
   To rule out any ambiguity, we could pretend this was a serialization format 
and either prefixed strings with their length, or null-terminate them. The same 
for variable-length lists of strings. These can either be length-prefixed or 
terminated with an invalid string that cannot occur (""? but not sure on this).



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+return Hashing.combineOrdered(
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey())
+.map(e -> HashCode.fromLong(e.getValue()))
+.toList()
+).asLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+HashFunction hf = Hashing.murmur3_128();
+Hasher topicHasher = hf.newHasher()
+.putByte((byte) 0) // magic byte
+.putLong(topicImage.id().hashCode()) // topic Id
+.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+.putInt(topicImage.partitions().size()); // number of partitions
+
+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
 -> {
+topicHasher.putInt(entry.getKey()); // partition id
+String racks = Arrays.stream(entry.getValue().replicas)
+.mapToObj(clusterImage::broker)
+.filter(Objects::nonNull)
+.map(BrokerRegistration::rack)
+.filter(Optional::isPresent)
+.map(Optional::get)
+.sorted()
+.collect(Collectors.joining(";"));

Review Comment:
   I think this is fine preventing for accidental collisions. Though it's still 
possible to _intentionally_ come up with rack names that create collisions, but 
I believe you'd only be impacting your own cluster.
   
   To rule out any ambiguity, we could pretend thi

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-30 Thread via GitHub


squah-confluent commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2069190560


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +337,87 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0);
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {

Review Comment:
   If the hashing function is ever changed, is there a version field that 
should be updated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-30 Thread via GitHub


squah-confluent commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2069188025


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+return Hashing.combineOrdered(
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey())
+.map(e -> HashCode.fromLong(e.getValue()))
+.toList()
+).asLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+HashFunction hf = Hashing.murmur3_128();
+Hasher topicHasher = hf.newHasher()
+.putByte((byte) 0) // magic byte
+.putLong(topicImage.id().hashCode()) // topic Id
+.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+.putInt(topicImage.partitions().size()); // number of partitions
+
+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
 -> {
+topicHasher.putInt(entry.getKey()); // partition id
+String racks = Arrays.stream(entry.getValue().replicas)
+.mapToObj(clusterImage::broker)
+.filter(Objects::nonNull)
+.map(BrokerRegistration::rack)
+.filter(Optional::isPresent)
+.map(Optional::get)
+.sorted()
+.collect(Collectors.joining(";"));

Review Comment:
   I think this is fine preventing for accidental collisions. Though it's still 
possible to _intentionally_ come up with rack names that create collisions, but 
I believe you'd only be impacting your own cluster.
   
   To rule out any ambiguity, I'd strongly prefer if we pretended this was a 
serialization format and either prefixed strings with their length, or 
null-terminated them. The same for variable-length lists of strings. These can 
either be length-prefixed or terminated with an invalid string that cannot 
occur (""? but not sure on this).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-30 Thread via GitHub


chia7712 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2068885949


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -324,4 +337,87 @@ static void throwIfRegularExpressionIsInvalid(
 regex, ex.getDescription()));
 }
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+static final XXHash64 LZ4_HASH_INSTANCE = 
XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+return LZ4_HASH_INSTANCE.hash(resultBytes, 0, resultBytes.length, 0);
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by XXHash64.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {

Review Comment:
   Please add documentation to remind developers that the hash is stored as 
part of the state. Changing the implementation of the hashing function may 
break compatibility with existing states.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-30 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2068862253


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +221,90 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+// Convert the byte array to long. This is taken from guava 
BytesHashCode#asLong.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295
+long retVal = (resultBytes[0] & 0xFF);
+for (int i = 1; i < resultBytes.length; i++) {
+retVal |= (resultBytes[i] & 0xFFL) << (i * 8);
+}
+return retVal;
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+try (ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review Comment:
   Sorry, I misunderstood ByteBufferOutputStream. I thought it uses fixed 
capacity even if there is no enough buffer. After checking the source code, it 
expands memory if the buffer is not big enough. Updated it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066485713


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +221,90 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+// Convert the byte array to long. This is taken from guava 
BytesHashCode#asLong.

Review Comment:
   Yes, we can use it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066476357


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +221,90 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();

Review Comment:
   Move to `org.apache.kafka.coordinator.group.Utils`. Thanks.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +221,90 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+// Convert the byte array to long. This is taken from guava 
BytesHashCode#asLong.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295
+long retVal = (resultBytes[0] & 0xFF);
+for (int i = 1; i < resultBytes.length; i++) {
+retVal |= (resultBytes[i] & 0xFFL) << (i * 8);
+}
+return retVal;
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.

Review Comment:
   Updated it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066410256


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +221,90 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+// Convert the byte array to long. This is taken from guava 
BytesHashCode#asLong.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295
+long retVal = (resultBytes[0] & 0xFF);
+for (int i = 1; i < resultBytes.length; i++) {
+retVal |= (resultBytes[i] & 0xFFL) << (i * 8);
+}
+return retVal;
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) throws IOException {
+try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos)) {
+dos.writeByte(TOPIC_HASH_MAGIC_BYTE); // magic byte
+dos.writeLong(topicImage.id().hashCode()); // topic ID
+dos.writeUTF(topicImage.name()); // topic name
+dos.writeInt(topicImage.partitions().size()); // number of 
partitions
+for (int i = 0; i < topicImage.partitions().size(); i++) {
+dos.writeInt(i); // partition id
+List sortedRacksList = 
Arrays.stream(topicImage.partitions().get(i).replicas)
+.mapToObj(clusterImage::broker)
+.filter(Objects::nonNull)
+.map(BrokerRegistration::rack)
+.filter(Optional::isPresent)
+.map(Optional::get)
+.sorted()
+.toList();
+
+String racks = IntStream.range(0, sortedRacksList.size())

Review Comment:
   There is no limitation for rack string, so any character can be part of rack 
string. I can update KIP if needs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


chia7712 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066374101


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   > The ByteBufferOutputStream needs a ByteBuffer with capacity. I wonder 
whether we can calculate a accurate capacity. For example, rack string can 
contain any character. It presents 1 to 4 bytes in UTF-8.
   
   The initialize capacity can be discussed later. In fact, it may be not a 
issue if we adopt the growable buffer. The buffer can be big enough for each 
hash computing eventually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


chia7712 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066288123


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +221,90 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+// Convert the byte array to long. This is taken from guava 
BytesHashCode#asLong.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L279-L295
+long retVal = (resultBytes[0] & 0xFF);
+for (int i = 1; i < resultBytes.length; i++) {
+retVal |= (resultBytes[i] & 0xFFL) << (i * 8);
+}
+return retVal;
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.

Review Comment:
   please update the docs



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +221,90 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+// Convert long to byte array. This is taken from guava 
LongHashCode#asBytes.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/HashCode.java#L187-L199
+LongFunction longToBytes = (long value) -> new byte[] {
+(byte) value,
+(byte) (value >> 8),
+(byte) (value >> 16),
+(byte) (value >> 24),
+(byte) (value >> 32),
+(byte) (value >> 40),
+(byte) (value >> 48),
+(byte) (value >> 56)
+};
+
+// Combine the sorted topic hashes.
+byte[] resultBytes = new byte[8];
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey()) // sort by topic name
+.map(Map.Entry::getValue)
+.map(longToBytes::apply)
+.forEach(nextBytes -> {
+// Combine ordered hashes. This is taken from guava 
Hashing#combineOrdered.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/guava/src/com/google/common/hash/Hashing.java#L689-L712
+for (int i = 0; i < nextBytes.length; i++) {
+resultBytes[i] = (byte) (resultBytes[i] * 37 ^ 
nextBytes[i]);
+}
+});
+
+// Convert the byte array to long. This is taken from guava 
BytesHashCode#asLong.
+// 
https://github.com/google/guava/blob/bdf2a9d05342fca852645278d474082905e09d94/gua

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066325978


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   > I also wonder what is the impact of putting all the data to a byte array 
before hashing it. Do you have thoughts on this?
   
   Based on the KIP-1101, it minimizes the calculation count of topic hash. The 
result can be shared between groups. I think we can keep this function simple 
currently.
   
   > I suggest that EventProcessorThread can leverage GrowableBufferSupplier to 
reuse buffer as much as possible. 
   
   With BufferSupplier, the hash function needs to be thread safe to reuse the 
buffer. We can revisit it in the future.
   
   > Additionally, Group#computeTopicHashin should use ByteBufferOutputStream 
to generate the bytes array, as ByteBufferOutputStream#buffer#array can avoid 
extra array copy like ByteArrayOutputStream#toByteArray
   
   The ByteBufferOutputStream needs a ByteBuffer with capacity. I wonder 
whether we can calculate a accurate capacity. For example, rack string can 
contain any character. It presents 1 to 4 bytes in UTF-8.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


chia7712 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066249125


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   > I agree with using a BufferSupplier in order to reuse buffers. However, 
EventProcessorThread may be too low level to hold it. Having it in Shard may be 
enough.
   
   we can revisit this when the critical code are used by production :)
   
   @FrankYang0529 thanks for updates. the result LGTM. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066166230


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   Thanks for the suggestion. Updated benchmark result.
   
   ```
   Benchmark  (numReplicasPerBroker)  (partitionsPerTopic)  
(replicationFactor)  Mode  Cnt ScoreError  Units
   TopicHashBenchmark.testLz4 1010  
  3  avgt   15   166.389 ±  1.542  ns/op
   TopicHashBenchmark.testLz4 1050  
  3  avgt   15   375.660 ±  2.771  ns/op
   TopicHashBenchmark.testLz4 10   100  
  3  avgt   15   636.176 ±  8.305  ns/op
   TopicHashBenchmark.testMurmur  1010  
  3  avgt   15   238.242 ±  1.664  ns/op
   TopicHashBenchmark.testMurmur  1050  
  3  avgt   15  1143.583 ±  5.981  ns/op
   TopicHashBenchmark.testMurmur  10   100  
  3  avgt   15  2278.680 ± 29.007  ns/op
   ```
   
   
   
TopicHashBenchmark.java
   
   
   ```java
   package org.apache.kafka.jmh.metadata;
   
   import org.apache.kafka.common.Uuid;
   import org.apache.kafka.common.metadata.RegisterBrokerRecord;
   import org.apache.kafka.image.ClusterDelta;
   import org.apache.kafka.image.ClusterImage;
   import org.apache.kafka.image.TopicsDelta;
   import org.apache.kafka.image.TopicImage;
   import org.apache.kafka.metadata.BrokerRegistration;
   import org.apache.kafka.streams.state.internals.Murmur3;
   
   import net.jpountz.xxhash.XXHash64;
   import net.jpountz.xxhash.XXHashFactory;
   
   import org.openjdk.jmh.annotations.Benchmark;
   import org.openjdk.jmh.annotations.BenchmarkMode;
   import org.openjdk.jmh.annotations.Fork;
   import org.openjdk.jmh.annotations.Level;
   import org.openjdk.jmh.annotations.Measurement;
   import org.openjdk.jmh.annotations.Mode;
   import org.openjdk.jmh.annotations.OutputTimeUnit;
   import org.openjdk.jmh.annotations.Param;
   import org.openjdk.jmh.annotations.Scope;
   import org.openjdk.jmh.annotations.Setup;
   import org.openjdk.jmh.annotations.State;
   import org.openjdk.jmh.annotations.Warmup;
   
   import java.io.ByteArrayOutputStream;
   import java.io.DataOutputStream;
   import java.io.IOException;
   import java.util.Arrays;
   import java.util.List;
   import java.util.Objects;
   import java.util.Optional;
   import java.util.concurrent.TimeUnit;
   import java.util.stream.Collectors;
   import java.util.stream.IntStream;
   
   import static 
org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta;
   import static 
org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers;
   
   @State(Scope.Benchmark)
   @Fork(value = 1)
   @Warmup(iterations = 5)
   @Measurement(iterations = 15)
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.NANOSECONDS)
   public class TopicHashBenchmark {
   @Param({"10", "50", "100"})
   private int partitionsPerTopic;
   @Param({"3"})
   private int replicationFactor;
   @Param({"10"})
   private int numReplicasPerBroker;
   
   private byte[] topicBytes;
   
   @Setup(Level.Trial)
   public void setup() throws IOException {
   TopicsDelta topicsDelta = getInitialTopicsDelta(1, 
partitionsPerTopic, replicationFactor, numReplicasPerBroker);
   int numBrokers = getNumBrokers(1, partitionsPerTopic, 
replicationFactor, numReplicasPerBroker);
   ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY);
   for (int i = 0; i < numBrokers; i++) {
   clusterDelta.replay(new RegisterBrokerRecord()
   .setBrokerId(i)
   .setRack(Uuid.randomUuid().toString())
   );
   }
   TopicImage topicImage = 
topicsDelta.apply().topicsById().values().stream().findFirst().get();
   ClusterImage clusterImage = clusterDelta.apply();
   
   try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
   dos.writeByte(0); // magic byte
   dos.writeLong(topicImage.id().hashCode()); // topic ID
   dos.writeUTF(topicImage.name()); // topic name
   dos.writeInt(topicImage.partitions().size()); // number of 
partitions
   for (int i = 0; i < topicImage.partitions().size(); i++) {

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066162858


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +221,90 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * The magic byte used to identify the version of topic hash function.
+ */
+byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+XXHash64 LZ4_HASH_INSTANCE = XXHashFactory.fastestInstance().hash64();

Review Comment:
   Would it be possible to put those and the new methods to a separate class? 
Having them in `Group` is weird because it won't be used by all the group types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066160108


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   > IIRC, the lz4-java is not maintained anymore, so not sure whether the code 
maintaining is a risk.
   > 
   > > I also wonder what is the impact of putting all the data to a byte array 
before hashing it. Do you have thoughts on this?
   > 
   > I suggest that `EventProcessorThread` can leverage 
`GrowableBufferSupplier` to reuse buffer as much as possible. Additionally, 
`Group#computeTopicHashin` should use `ByteBufferOutputStream` to generate the 
bytes array, as `ByteBufferOutputStream#buffer#array` can avoid extra array 
copy like `ByteArrayOutputStream#toByteArray`
   
   I agree with using a BufferSupplier in order to reuse buffers. However, 
EventProcessorThread may be too low level to hold it. Having it in Shard may be 
enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


chia7712 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066122048


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   @FrankYang0529 could you please move `baos.toByteArray()` out of benchmark? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-29 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066016813


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   Based on benchmark result, lz4 has better performance in our case. I will 
change to use it.
   
   ```
   Benchmark  (numReplicasPerBroker)  (partitionsPerTopic)  
(replicationFactor)  Mode  Cnt ScoreError  Units
   TopicHashBenchmark.testLz4 1010  
  3  avgt   15   194.553 ±  1.631  ns/op
   TopicHashBenchmark.testLz4 1050  
  3  avgt   15   484.640 ±  1.721  ns/op
   TopicHashBenchmark.testLz4 10   100  
  3  avgt   15   883.435 ±  4.001  ns/op
   TopicHashBenchmark.testMurmur  1010  
  3  avgt   15   205.529 ±  0.701  ns/op
   TopicHashBenchmark.testMurmur  1050  
  3  avgt   15  1066.528 ± 42.856  ns/op
   TopicHashBenchmark.testMurmur  10   100  
  3  avgt   15  2082.821 ± 10.935  ns/op
   ```
   
   
   
TopicHashBenchmark.java
   
   
   ```java
   package org.apache.kafka.jmh.metadata;
   
   import org.apache.kafka.common.Uuid;
   import org.apache.kafka.common.internals.Murmur3;
   import org.apache.kafka.common.metadata.RegisterBrokerRecord;
   import org.apache.kafka.image.ClusterDelta;
   import org.apache.kafka.image.ClusterImage;
   import org.apache.kafka.image.TopicsDelta;
   import org.apache.kafka.image.TopicImage;
   import org.apache.kafka.metadata.BrokerRegistration;
   
   import net.jpountz.xxhash.XXHash64;
   import net.jpountz.xxhash.XXHashFactory;
   
   import org.openjdk.jmh.annotations.Benchmark;
   import org.openjdk.jmh.annotations.BenchmarkMode;
   import org.openjdk.jmh.annotations.Fork;
   import org.openjdk.jmh.annotations.Level;
   import org.openjdk.jmh.annotations.Measurement;
   import org.openjdk.jmh.annotations.Mode;
   import org.openjdk.jmh.annotations.OutputTimeUnit;
   import org.openjdk.jmh.annotations.Param;
   import org.openjdk.jmh.annotations.Scope;
   import org.openjdk.jmh.annotations.Setup;
   import org.openjdk.jmh.annotations.State;
   import org.openjdk.jmh.annotations.TearDown;
   import org.openjdk.jmh.annotations.Warmup;
   
   import java.io.ByteArrayOutputStream;
   import java.io.DataOutputStream;
   import java.io.IOException;
   import java.util.Arrays;
   import java.util.List;
   import java.util.Objects;
   import java.util.Optional;
   import java.util.concurrent.TimeUnit;
   import java.util.stream.Collectors;
   import java.util.stream.IntStream;
   
   import static 
org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta;
   import static 
org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers;
   
   @State(Scope.Benchmark)
   @Fork(value = 1)
   @Warmup(iterations = 5)
   @Measurement(iterations = 15)
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.NANOSECONDS)
   public class TopicHashBenchmark {
   @Param({"10", "50", "100"})
   private int partitionsPerTopic;
   @Param({"3"})
   private int replicationFactor;
   @Param({"10"})
   private int numReplicasPerBroker;
   
   private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
   private final DataOutputStream dos = new DataOutputStream(baos);
   
   @Setup(Level.Trial)
   public void setup() throws IOException {
   TopicsDelta topicsDelta = getInitialTopicsDelta(1, 
partitionsPerTopic, replicationFactor, numReplicasPerBroker);
   int numBrokers = getNumBrokers(1, partitionsPerTopic, 
replicationFactor, numReplicasPerBroker);
   ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY);
   for (int i = 0; i < numBrokers; i++) {
   clusterDelta.replay(new RegisterBrokerRecord()
   .setBrokerId(i)
   .setRack(Uuid.randomUuid().toString())
   );
   }
   TopicImage topicImage = 
topicsDelta.apply().topicsById().values().stream().findFirst().get();
   ClusterImage clusterImage = clusterDelta.apply();
   
   dos.writeByte(0); // magic byte
   dos.writeLong(topicImage.id().hashCode()); // topic ID
   dos.writeUTF(topicImage.name()); // topic name
   dos.writeInt(topicImage.partitions().size()); // number of partitions
   for (int i = 0; i < topicImag

Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-28 Thread via GitHub


ijuma commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2064071619


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   This hash function is used by zstd too. Its pretty safe to rely on it given 
that lz4 and zstd are the most popular compression algorithms. And we will be 
supporting them for the foreseeable future.
   
   Which particular implementation we use is a fair question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-28 Thread via GitHub


chia7712 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2064046103


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   IIRC, the lz4-java is not maintained anymore, so not sure whether the code 
maintaining is a risk. 
   
   > I also wonder what is the impact of putting all the data to a byte array 
before hashing it. Do you have thoughts on this?
   
   I suggest that `EventProcessorThread` can leverage `GrowableBufferSupplier` 
to reuse buffer as much as possible. Additionally, `Group#computeTopicHashin` 
should use `ByteBufferOutputStream` to generate the bytes array, as 
`ByteBufferOutputStream#buffer#array` can avoid extra array copy like 
`ByteArrayOutputStream#toByteArray`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-28 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2063449780


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   Thanks!
   
   While talking to @ijuma about it, he has suggested to look into 
https://github.com/lz4/lz4-java/tree/master/src/java/net/jpountz/xxhash. We get 
it via lz4 and it is apparently much faster than Murmur3. It may be worth 
running a few benchmarks to compare then. What do you think?
   
   I also wonder what is the impact of putting all the data to a byte array 
before hashing it. Do you have thoughts on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-28 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2063441471


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+return Hashing.combineOrdered(
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey())
+.map(e -> HashCode.fromLong(e.getValue()))
+.toList()
+).asLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+HashFunction hf = Hashing.murmur3_128();
+Hasher topicHasher = hf.newHasher()
+.putByte((byte) 0) // magic byte

Review Comment:
   Yes, add `TOPIC_HASH_MAGIC_BYTE`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-28 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2063441134


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+return Hashing.combineOrdered(
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey())
+.map(e -> HashCode.fromLong(e.getValue()))
+.toList()
+).asLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+HashFunction hf = Hashing.murmur3_128();
+Hasher topicHasher = hf.newHasher()
+.putByte((byte) 0) // magic byte
+.putLong(topicImage.id().hashCode()) // topic Id
+.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+.putInt(topicImage.partitions().size()); // number of partitions
+
+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
 -> {

Review Comment:
   Good suggestion! Thanks. Updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-28 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2063440444


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+return Hashing.combineOrdered(
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey())
+.map(e -> HashCode.fromLong(e.getValue()))
+.toList()
+).asLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+HashFunction hf = Hashing.murmur3_128();
+Hasher topicHasher = hf.newHasher()
+.putByte((byte) 0) // magic byte
+.putLong(topicImage.id().hashCode()) // topic Id
+.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+.putInt(topicImage.partitions().size()); // number of partitions
+
+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
 -> {
+topicHasher.putInt(entry.getKey()); // partition id
+String racks = Arrays.stream(entry.getValue().replicas)
+.mapToObj(clusterImage::broker)
+.filter(Objects::nonNull)
+.map(BrokerRegistration::rack)
+.filter(Optional::isPresent)
+.map(Optional::get)
+.sorted()
+.collect(Collectors.joining(";"));

Review Comment:
   It looks like any character can be valid. I change the combination with 
following format:
   
   ```
   0:,1:, ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-28 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2063438718


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {

Review Comment:
   > I wonder whether it is worth inlining the implementation from Guava or 
something similar to combine the hashes. It would avoid the extra collections.
   
   Yes, I copy some implementation to this function.
   
   > In the KIP, you also mentioned combining the index with the hash. Is this 
something done within `combineOrdered`?
   
   No, the `computeGroupHash` sorts topics by name and use this order to merge 
hashes. I also add test case `testComputeGroupHashWithDifferentOrder` and 
`testComputeGroupHashWithSameKeyButDifferentValue` to verify it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-28 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2063438718


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {

Review Comment:
   > I wonder whether it is worth inlining the implementation from Guava or 
something similar to combine the hashes. It would avoid the extra collections.
   Yes, I copy some implementation to this function.
   
   > In the KIP, you also mentioned combining the index with the hash. Is this 
something done within `combineOrdered`?
   No, the `computeGroupHash` sorts topics by name and use this order to merge 
hashes. I also add test case `testComputeGroupHashWithDifferentOrder` and 
`testComputeGroupHashWithSameKeyButDifferentValue` to verify it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-28 Thread via GitHub


FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2063434753


##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   I update PR to remove guava. I think we can put all data to a byte array and 
use Murmur3 to hash it, so we don't rely on guava.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [2/N] Add compute topic and group hash [kafka]

2025-04-25 Thread via GitHub


dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2059807814


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+return Hashing.combineOrdered(
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey())
+.map(e -> HashCode.fromLong(e.getValue()))
+.toList()
+).asLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+HashFunction hf = Hashing.murmur3_128();
+Hasher topicHasher = hf.newHasher()
+.putByte((byte) 0) // magic byte
+.putLong(topicImage.id().hashCode()) // topic Id
+.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+.putInt(topicImage.partitions().size()); // number of partitions
+
+
topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry
 -> {
+topicHasher.putInt(entry.getKey()); // partition id
+String racks = Arrays.stream(entry.getValue().replicas)
+.mapToObj(clusterImage::broker)
+.filter(Objects::nonNull)
+.map(BrokerRegistration::rack)
+.filter(Optional::isPresent)
+.map(Optional::get)
+.sorted()
+.collect(Collectors.joining(";"));

Review Comment:
   `;` is allowed in the `rack` field too so it does really protect us.



##
gradle/dependencies.gradle:
##
@@ -147,6 +148,7 @@ libs += [
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
   classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   This is something that we haven't really discussed in the KIP because it is 
an implementation detail but we should discuss whether we really want to take a 
dependency on Guava.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {

Review Comment:
   I wonder whether it is worth inlining the implementation from Guava or 
something similar to combine the hashes. It would avoid the extra collections. 
I am not sure whether it makes a real difference though. What are your thoughts?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##
@@ -209,4 +219,50 @@ void validateOffsetFetch(
 default boolean shouldExpire() {
 return true;
 }
+
+/**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+ * @return The hash of the group.
+ */
+static long computeGroupHash(Map topicHashes) {
+return Hashing.combineOrdered(
+topicHashes.entrySet()
+.stream()
+.sorted(Map.Entry.comparingByKey())
+.map(e -> HashCode.fromLong(e.getValue()))
+.toList()
+).asLong();
+}
+
+/**
+ * Computes the hash of the topic id, name, number of partitions, and 
partition racks by Murmur3.
+ *
+ * @param topicImage   The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+static long computeTopicHash(TopicImage topicImage, ClusterImage 
clusterImage) {
+HashFunction hf = Hashing.murmur3_128();
+Hasher topicHasher = hf.newHasher()
+.putByte((byte) 0) // magic byte
+.putLong(topicImage.id().hashCode()) // topic Id
+.putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+.putInt(topicImage.partitions().size()); // number of partitio