dajac commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2088351150


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -324,4 +329,98 @@ static void throwIfRegularExpressionIsInvalid(
                     regex, ex.getDescription()));
         }
     }
+
+    /**
+     * The magic byte used to identify the version of topic hash function.
+     */
+    static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+    /**
+     * Computes the hash of the topics in a group.
+     * <p>
+     * The computed hash value is stored as the metadata hash in the 
*GroupMetadataValue.
+     * <p>
+     * If there is no topic, the hash value is set to 0.
+     * The hashing process involves the following steps:
+     * 1. Sort the topic hashes by topic name.
+     * 2. Write each topic hash in order.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        if (topicHashes.isEmpty()) {
+            return 0;
+        }
+
+        // Sort entries by topic name
+        List<Map.Entry<String, Long>> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+        sortedEntries.sort(Map.Entry.comparingByKey());
+
+        HashStream64 hasher = Hashing.xxh3_64().hashStream();
+        for (Map.Entry<String, Long> entry : sortedEntries) {
+            hasher.putLong(entry.getValue());
+        }
+
+        return hasher.getAsLong();
+    }
+
+    /**
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by streaming XXH3.
+     * <p>
+     * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+     * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+     * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+     * new hash version.
+     * <p>
+     * For non-existent topics, the hash value is set to 0.
+     * For existent topics, the hashing process involves the following steps:
+     * 1. Write a magic byte to denote the version of the hash function.
+     * 2. Write the hash code of the topic ID.
+     * 3. Write the topic name.
+     * 4. Write the number of partitions associated with the topic.
+     * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+     * - Rack identifiers are formatted as 
"<length1><value1><length2><value2>" to prevent issues with simple separators.
+     *
+     * @param topicName     The topic image.
+     * @param metadataImage The cluster image.
+     * @return The hash of the topic.
+     */
+    static long computeTopicHash(String topicName, MetadataImage 
metadataImage) {
+        TopicImage topicImage = metadataImage.topics().getTopic(topicName);
+        if (topicImage == null) {
+            return 0;
+        }
+
+        HashStream64 hasher = Hashing.xxh3_64().hashStream();
+        hasher = hasher
+            .putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte

Review Comment:
   I wonder whether we should remove the inline comments are the code is pretty 
self explanatory and you already explain the format in the java doc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -324,4 +329,98 @@ static void throwIfRegularExpressionIsInvalid(
                     regex, ex.getDescription()));
         }
     }
+
+    /**
+     * The magic byte used to identify the version of topic hash function.
+     */
+    static final byte TOPIC_HASH_MAGIC_BYTE = 0x00;
+
+    /**
+     * Computes the hash of the topics in a group.
+     * <p>
+     * The computed hash value is stored as the metadata hash in the 
*GroupMetadataValue.
+     * <p>
+     * If there is no topic, the hash value is set to 0.
+     * The hashing process involves the following steps:
+     * 1. Sort the topic hashes by topic name.
+     * 2. Write each topic hash in order.
+     *
+     * @param topicHashes The map of topic hashes. Key is topic name and value 
is the topic hash.
+     * @return The hash of the group.
+     */
+    static long computeGroupHash(Map<String, Long> topicHashes) {
+        if (topicHashes.isEmpty()) {
+            return 0;
+        }
+
+        // Sort entries by topic name
+        List<Map.Entry<String, Long>> sortedEntries = new 
ArrayList<>(topicHashes.entrySet());
+        sortedEntries.sort(Map.Entry.comparingByKey());
+
+        HashStream64 hasher = Hashing.xxh3_64().hashStream();
+        for (Map.Entry<String, Long> entry : sortedEntries) {
+            hasher.putLong(entry.getValue());
+        }
+
+        return hasher.getAsLong();
+    }
+
+    /**
+     * Computes the hash of the topic id, name, number of partitions, and 
partition racks by streaming XXH3.
+     * <p>
+     * The computed hash value for the topic is utilized in conjunction with 
the {@link #computeGroupHash(Map)}
+     * method and is stored as part of the metadata hash in the 
*GroupMetadataValue.
+     * It is important to note that if the hash algorithm is changed, the 
magic byte must be updated to reflect the
+     * new hash version.
+     * <p>
+     * For non-existent topics, the hash value is set to 0.
+     * For existent topics, the hashing process involves the following steps:
+     * 1. Write a magic byte to denote the version of the hash function.
+     * 2. Write the hash code of the topic ID.
+     * 3. Write the topic name.
+     * 4. Write the number of partitions associated with the topic.
+     * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
+     * - Rack identifiers are formatted as 
"<length1><value1><length2><value2>" to prevent issues with simple separators.
+     *
+     * @param topicName     The topic image.
+     * @param metadataImage The cluster image.
+     * @return The hash of the topic.
+     */
+    static long computeTopicHash(String topicName, MetadataImage 
metadataImage) {
+        TopicImage topicImage = metadataImage.topics().getTopic(topicName);
+        if (topicImage == null) {
+            return 0;
+        }
+
+        HashStream64 hasher = Hashing.xxh3_64().hashStream();
+        hasher = hasher
+            .putByte(TOPIC_HASH_MAGIC_BYTE) // magic byte
+            .putLong(topicImage.id().hashCode()) // topic ID

Review Comment:
   I wonder about this one. It may be better to hash it as two longs 
(getMostSignificantBits and getLeastSignificantBits). What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to